diff --git a/.github/workflows/flex-interactive.yml b/.github/workflows/flex-interactive.yml index 505946aa4b1d..379f5faf2780 100644 --- a/.github/workflows/flex-interactive.yml +++ b/.github/workflows/flex-interactive.yml @@ -56,7 +56,7 @@ jobs: # install gsctl python3 -m pip install ${GITHUB_WORKSPACE}/python/dist/*.whl # launch service: 8080 for coordinator http port; 7687 for cypher port; - gsctl instance deploy --type interactive --image-registry graphscope --image-tag latest --interactive-config ${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml + gsctl instance deploy --type interactive --image-registry graphscope --image-tag latest --cypher-port 7688 --interactive-config ${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml sleep 20 # test python3 -m pip install --no-cache-dir pytest pytest-cov pytest-timeout pytest-xdist @@ -66,6 +66,10 @@ jobs: --exitfirst \ $(dirname $(python3 -c "import graphscope.gsctl as gsctl; print(gsctl.__file__)"))/tests/test_interactive.py + # test coordinator + res=`curl http://127.0.0.1:8080/api/v1/service` + echo $res | grep 7688 || exit 1 + # destroy instance gsctl instance destroy --type interactive -y diff --git a/.github/workflows/interactive.yml b/.github/workflows/interactive.yml index 39c86bae49e8..345f1f61bbe5 100644 --- a/.github/workflows/interactive.yml +++ b/.github/workflows/interactive.yml @@ -626,3 +626,15 @@ jobs: SCHEMA_FILE=${GITHUB_WORKSPACE}/flex/tests/rt_mutable_graph/movie_schema_test.yaml BULK_LOAD_FILE=${GITHUB_WORKSPACE}/flex/tests/rt_mutable_graph/movie_import_test.yaml GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d /tmp/csr-data-dir/ + + - name: Test graph loading with different delimiter + env: + GS_TEST_DIR: ${{ github.workspace }}/gstest/ + FLEX_DATA_DIR: ${{ github.workspace }}/gstest/flex/modern_graph_tab_delimiter/ + run: | + rm -rf /tmp/csr-data-dir/ + cd ${GITHUB_WORKSPACE}/flex/build/ + SCHEMA_FILE=${GITHUB_WORKSPACE}/flex/interactive/examples/modern_graph/graph.yaml + BULK_LOAD_FILE=${GITHUB_WORKSPACE}/flex/interactive/examples/modern_graph/bulk_load.yaml + sed -i 's/|/\\t/g' ${BULK_LOAD_FILE} + GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d /tmp/csr-data-dir/ diff --git a/charts/graphscope-store/templates/configmap.yaml b/charts/graphscope-store/templates/configmap.yaml index b504850f8f7f..c690d3f33809 100644 --- a/charts/graphscope-store/templates/configmap.yaml +++ b/charts/graphscope-store/templates/configmap.yaml @@ -181,7 +181,12 @@ data: export GRPC_PORT=${GROOT_GRPC_PORT} export GREMLIN_PORT=${GROOT_GREMLIN_PORT} echo "${HOST} ${GRPC_PORT} ${GREMLIN_PORT}" - python3 -c 'import base64;import os;from gremlin_python.driver.client import Client;ip=os.getenv("HOST");gremlin_port=os.getenv("GREMLIN_PORT");graph_url=f"ws://{ip}:{gremlin_port}/gremlin";username=os.getenv("GROOT_USERNAME");password=base64.b64decode(os.getenv("GROOT_PASSWORD")).decode("utf-8");client = Client(graph_url, "g", username=username, password=password); ret = client.submit("g.V().limit(1)").all().result(); client.close();' && break + cmd="nc -zv ${HOST} ${GREMLIN_PORT}" + res=$(eval $cmd 2>&1) + if [[ $res == *succeeded* ]]; then + # Expected Output is + break + fi sleep 3 done diff --git a/charts/graphscope-store/templates/portal/statefulset.yaml b/charts/graphscope-store/templates/portal/statefulset.yaml index 1e8c5ee471cb..17a5b975cbc3 100644 --- a/charts/graphscope-store/templates/portal/statefulset.yaml +++ b/charts/graphscope-store/templates/portal/statefulset.yaml @@ -115,6 +115,8 @@ spec: value: {{ .Values.portal.runtimePath | quote }} - name: STUDIO_WRAPPER_ENDPOINT value: {{ .Values.portal.studioWrapperEndpoint | quote }} + - name: BASEID + value: {{ .Values.portal.baseId | quote }} - name: SOLUTION value: "GRAPHSCOPE_INSIGHT" {{- range $key, $value := .Values.env }} diff --git a/charts/graphscope-store/values.yaml b/charts/graphscope-store/values.yaml index 915f2f66d53d..5f8126aae4f0 100644 --- a/charts/graphscope-store/values.yaml +++ b/charts/graphscope-store/values.yaml @@ -295,6 +295,9 @@ portal: ## Request for data loading ## studioWrapperEndpoint: "" + ## + ## baseId is the id used for creating odps dataloading job + baseId: "" ## @param hostIPC Specify if host IPC should be enabled for pods ## hostIPC: false diff --git a/coordinator/gscoordinator/coordinator.py b/coordinator/gscoordinator/coordinator.py index f4955102d8bf..a3f6f4a7a56f 100644 --- a/coordinator/gscoordinator/coordinator.py +++ b/coordinator/gscoordinator/coordinator.py @@ -29,9 +29,11 @@ import connexion import grpc +from flask_cors import CORS from graphscope.config import Config from graphscope.proto import coordinator_service_pb2_grpc +from gscoordinator.flex.core.client_wrapper import initialize_client_wrapper from gscoordinator.flex.encoder import JSONEncoder from gscoordinator.monitor import Monitor from gscoordinator.servicer import init_graphscope_one_service_servicer @@ -125,6 +127,7 @@ def get_servicer(config: Config): def start_http_service(config): + initialize_client_wrapper(config) app = connexion.App(__name__, specification_dir="./flex/openapi/") app.app.json_encoder = JSONEncoder app.add_api( @@ -132,6 +135,8 @@ def start_http_service(config): arguments={"title": "GraphScope FLEX HTTP SERVICE API"}, pythonic_params=True, ) + # support cross origin. + CORS(app.app) app.run(port=config.coordinator.http_port) diff --git a/coordinator/gscoordinator/flex/controllers/data_source_controller.py b/coordinator/gscoordinator/flex/controllers/data_source_controller.py index 792d6e5a11df..2d016dee40df 100644 --- a/coordinator/gscoordinator/flex/controllers/data_source_controller.py +++ b/coordinator/gscoordinator/flex/controllers/data_source_controller.py @@ -7,7 +7,7 @@ from gscoordinator.flex.models.schema_mapping import SchemaMapping # noqa: E501 from gscoordinator.flex import util -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core import handle_api_exception @@ -26,7 +26,7 @@ def bind_datasource_in_batch(graph_id, schema_mapping): # noqa: E501 """ if connexion.request.is_json: schema_mapping = SchemaMapping.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.bind_datasource_in_batch(graph_id, schema_mapping) + return get_client_wrapper().bind_datasource_in_batch(graph_id, schema_mapping) @handle_api_exception() @@ -40,7 +40,7 @@ def get_datasource_by_id(graph_id): # noqa: E501 :rtype: Union[SchemaMapping, Tuple[SchemaMapping, int], Tuple[SchemaMapping, int, Dict[str, str]] """ - return client_wrapper.get_datasource_by_id(graph_id) + return get_client_wrapper().get_datasource_by_id(graph_id) @handle_api_exception() @@ -60,7 +60,7 @@ def unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_ :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_vertex_type) + return get_client_wrapper().unbind_edge_datasource(graph_id, type_name, source_vertex_type, destination_vertex_type) @handle_api_exception() @@ -76,4 +76,4 @@ def unbind_vertex_datasource(graph_id, type_name): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.unbind_vertex_datasource(graph_id, type_name) + return get_client_wrapper().unbind_vertex_datasource(graph_id, type_name) diff --git a/coordinator/gscoordinator/flex/controllers/deployment_controller.py b/coordinator/gscoordinator/flex/controllers/deployment_controller.py index 98ba7375f920..fc3a8d0f27c1 100644 --- a/coordinator/gscoordinator/flex/controllers/deployment_controller.py +++ b/coordinator/gscoordinator/flex/controllers/deployment_controller.py @@ -11,7 +11,7 @@ from gscoordinator.flex.models.running_deployment_status import RunningDeploymentStatus # noqa: E501 from gscoordinator.flex import util -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core import handle_api_exception @@ -24,7 +24,7 @@ def get_deployment_info(): # noqa: E501 :rtype: Union[RunningDeploymentInfo, Tuple[RunningDeploymentInfo, int], Tuple[RunningDeploymentInfo, int, Dict[str, str]] """ - return client_wrapper.get_deployment_info() + return get_client_wrapper().get_deployment_info() @handle_api_exception() @@ -42,7 +42,7 @@ def get_deployment_pod_log(pod_name, component, from_cache): # noqa: E501 :rtype: Union[GetPodLogResponse, Tuple[GetPodLogResponse, int], Tuple[GetPodLogResponse, int, Dict[str, str]] """ - return client_wrapper.get_deployment_pod_log(pod_name, component, from_cache) + return get_client_wrapper().get_deployment_pod_log(pod_name, component, from_cache) @handle_api_exception() @@ -54,7 +54,7 @@ def get_deployment_resource_usage(): # noqa: E501 :rtype: Union[GetResourceUsageResponse, Tuple[GetResourceUsageResponse, int], Tuple[GetResourceUsageResponse, int, Dict[str, str]] """ - return client_wrapper.get_deployment_resource_usage() + return get_client_wrapper().get_deployment_resource_usage() @handle_api_exception() @@ -66,7 +66,7 @@ def get_deployment_status(): # noqa: E501 :rtype: Union[RunningDeploymentStatus, Tuple[RunningDeploymentStatus, int], Tuple[RunningDeploymentStatus, int, Dict[str, str]] """ - return client_wrapper.get_deployment_status() + return get_client_wrapper().get_deployment_status() @handle_api_exception() @@ -78,4 +78,4 @@ def get_storage_usage(): # noqa: E501 :rtype: Union[GetStorageUsageResponse, Tuple[GetStorageUsageResponse, int], Tuple[GetStorageUsageResponse, int, Dict[str, str]] """ - return client_wrapper.get_storage_usage() + return get_client_wrapper().get_storage_usage() diff --git a/coordinator/gscoordinator/flex/controllers/graph_controller.py b/coordinator/gscoordinator/flex/controllers/graph_controller.py index ea7a6020610f..41c56c6e0897 100644 --- a/coordinator/gscoordinator/flex/controllers/graph_controller.py +++ b/coordinator/gscoordinator/flex/controllers/graph_controller.py @@ -13,7 +13,7 @@ from gscoordinator.flex.models.get_graph_schema_response import GetGraphSchemaResponse # noqa: E501 from gscoordinator.flex import util -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core import handle_api_exception @@ -32,7 +32,7 @@ def create_edge_type(graph_id, create_edge_type=None): # noqa: E501 """ if connexion.request.is_json: create_edge_type = CreateEdgeType.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.create_edge_type(graph_id, create_edge_type) + return get_client_wrapper().create_edge_type(graph_id, create_edge_type) @handle_api_exception() @@ -48,7 +48,7 @@ def create_graph(create_graph_request): # noqa: E501 """ if connexion.request.is_json: create_graph_request = CreateGraphRequest.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.create_graph(create_graph_request) + return get_client_wrapper().create_graph(create_graph_request) @handle_api_exception() @@ -66,7 +66,7 @@ def create_vertex_type(graph_id, create_vertex_type): # noqa: E501 """ if connexion.request.is_json: create_vertex_type = CreateVertexType.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.create_vertex_type(graph_id, create_vertex_type) + return get_client_wrapper().create_vertex_type(graph_id, create_vertex_type) @handle_api_exception() @@ -86,7 +86,7 @@ def delete_edge_type_by_name(graph_id, type_name, source_vertex_type, destinatio :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.delete_edge_type_by_name( + return get_client_wrapper().delete_edge_type_by_name( graph_id, type_name, source_vertex_type, destination_vertex_type ) @@ -102,7 +102,7 @@ def delete_graph_by_id(graph_id): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.delete_graph_by_id(graph_id) + return get_client_wrapper().delete_graph_by_id(graph_id) @handle_api_exception() @@ -118,7 +118,7 @@ def delete_vertex_type_by_name(graph_id, type_name): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.delete_vertex_type_by_name(graph_id, type_name) + return get_client_wrapper().delete_vertex_type_by_name(graph_id, type_name) @handle_api_exception() @@ -132,7 +132,7 @@ def get_graph_by_id(graph_id): # noqa: E501 :rtype: Union[GetGraphResponse, Tuple[GetGraphResponse, int], Tuple[GetGraphResponse, int, Dict[str, str]] """ - return client_wrapper.get_graph_by_id(graph_id) + return get_client_wrapper().get_graph_by_id(graph_id) @handle_api_exception() @@ -146,7 +146,7 @@ def get_schema_by_id(graph_id): # noqa: E501 :rtype: Union[GetGraphSchemaResponse, Tuple[GetGraphSchemaResponse, int], Tuple[GetGraphSchemaResponse, int, Dict[str, str]] """ - return client_wrapper.get_schema_by_id(graph_id) + return get_client_wrapper().get_schema_by_id(graph_id) @handle_api_exception() @@ -164,7 +164,7 @@ def import_schema_by_id(graph_id, create_graph_schema_request): # noqa: E501 """ if connexion.request.is_json: create_graph_schema_request = CreateGraphSchemaRequest.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.import_schema(graph_id, create_graph_schema_request) + return get_client_wrapper().import_schema(graph_id, create_graph_schema_request) @handle_api_exception() @@ -176,4 +176,4 @@ def list_graphs(): # noqa: E501 :rtype: Union[List[GetGraphResponse], Tuple[List[GetGraphResponse], int], Tuple[List[GetGraphResponse], int, Dict[str, str]] """ - return client_wrapper.list_graphs() + return get_client_wrapper().list_graphs() diff --git a/coordinator/gscoordinator/flex/controllers/job_controller.py b/coordinator/gscoordinator/flex/controllers/job_controller.py index ef5c58550a2c..30b218fff017 100644 --- a/coordinator/gscoordinator/flex/controllers/job_controller.py +++ b/coordinator/gscoordinator/flex/controllers/job_controller.py @@ -10,7 +10,7 @@ from gscoordinator.flex.models.job_status import JobStatus # noqa: E501 from gscoordinator.flex import util -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core import handle_api_exception @@ -27,7 +27,7 @@ def delete_job_by_id(job_id, delete_scheduler=None): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.delete_job_by_id(job_id, delete_scheduler) + return get_client_wrapper().delete_job_by_id(job_id, delete_scheduler) @handle_api_exception() @@ -45,7 +45,7 @@ def get_dataloading_job_config(graph_id, dataloading_job_config): # noqa: E501 """ if connexion.request.is_json: dataloading_job_config = DataloadingJobConfig.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.get_dataloading_job_config(graph_id, dataloading_job_config) + return get_client_wrapper().get_dataloading_job_config(graph_id, dataloading_job_config) @handle_api_exception() @@ -59,7 +59,7 @@ def get_job_by_id(job_id): # noqa: E501 :rtype: Union[JobStatus, Tuple[JobStatus, int], Tuple[JobStatus, int, Dict[str, str]] """ - return client_wrapper.get_job_by_id(job_id) + return get_client_wrapper().get_job_by_id(job_id) @handle_api_exception() @@ -71,7 +71,7 @@ def list_jobs(): # noqa: E501 :rtype: Union[List[JobStatus], Tuple[List[JobStatus], int], Tuple[List[JobStatus], int, Dict[str, str]] """ - return client_wrapper.list_jobs() + return get_client_wrapper().list_jobs() @handle_api_exception() @@ -89,4 +89,4 @@ def submit_dataloading_job(graph_id, dataloading_job_config): # noqa: E501 """ if connexion.request.is_json: dataloading_job_config = DataloadingJobConfig.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.submit_dataloading_job(graph_id, dataloading_job_config) + return get_client_wrapper().submit_dataloading_job(graph_id, dataloading_job_config) diff --git a/coordinator/gscoordinator/flex/controllers/service_controller.py b/coordinator/gscoordinator/flex/controllers/service_controller.py index d37fae8bb9c7..dceda84a7291 100644 --- a/coordinator/gscoordinator/flex/controllers/service_controller.py +++ b/coordinator/gscoordinator/flex/controllers/service_controller.py @@ -8,7 +8,7 @@ from gscoordinator.flex.models.start_service_request import StartServiceRequest # noqa: E501 from gscoordinator.flex import util -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core import handle_api_exception @@ -22,7 +22,7 @@ def get_service_status_by_id(graph_id): # noqa: E501 :rtype: Union[ServiceStatus, Tuple[ServiceStatus, int], Tuple[ServiceStatus, int, Dict[str, str]] """ - return client_wrapper.get_service_status_by_id(graph_id) + return get_client_wrapper().get_service_status_by_id(graph_id) @handle_api_exception() @@ -34,7 +34,7 @@ def list_service_status(): # noqa: E501 :rtype: Union[List[ServiceStatus], Tuple[List[ServiceStatus], int], Tuple[List[ServiceStatus], int, Dict[str, str]] """ - return client_wrapper.list_service_status() + return get_client_wrapper().list_service_status() @handle_api_exception() @@ -46,7 +46,7 @@ def restart_service(): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.restart_service() + return get_client_wrapper().restart_service() @handle_api_exception() @@ -62,7 +62,7 @@ def start_service(start_service_request=None): # noqa: E501 """ if connexion.request.is_json: start_service_request = StartServiceRequest.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.start_service(start_service_request) + return get_client_wrapper().start_service(start_service_request) @handle_api_exception() @@ -74,4 +74,4 @@ def stop_service(): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.stop_service() + return get_client_wrapper().stop_service() diff --git a/coordinator/gscoordinator/flex/controllers/stored_procedure_controller.py b/coordinator/gscoordinator/flex/controllers/stored_procedure_controller.py index 9410963119af..f0e84f820961 100644 --- a/coordinator/gscoordinator/flex/controllers/stored_procedure_controller.py +++ b/coordinator/gscoordinator/flex/controllers/stored_procedure_controller.py @@ -10,7 +10,7 @@ from gscoordinator.flex.models.update_stored_proc_request import UpdateStoredProcRequest # noqa: E501 from gscoordinator.flex import util -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core import handle_api_exception @@ -29,7 +29,7 @@ def create_stored_procedure(graph_id, create_stored_proc_request): # noqa: E501 """ if connexion.request.is_json: create_stored_proc_request = CreateStoredProcRequest.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.create_stored_procedure(graph_id, create_stored_proc_request) + return get_client_wrapper().create_stored_procedure(graph_id, create_stored_proc_request) @handle_api_exception() @@ -45,7 +45,7 @@ def delete_stored_procedure_by_id(graph_id, stored_procedure_id): # noqa: E501 :rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]] """ - return client_wrapper.delete_stored_procedure_by_id(graph_id, stored_procedure_id) + return get_client_wrapper().delete_stored_procedure_by_id(graph_id, stored_procedure_id) @handle_api_exception() @@ -61,7 +61,7 @@ def get_stored_procedure_by_id(graph_id, stored_procedure_id): # noqa: E501 :rtype: Union[GetStoredProcResponse, Tuple[GetStoredProcResponse, int], Tuple[GetStoredProcResponse, int, Dict[str, str]] """ - return client_wrapper.get_stored_procedure_by_id(graph_id, stored_procedure_id) + return get_client_wrapper().get_stored_procedure_by_id(graph_id, stored_procedure_id) @handle_api_exception() @@ -75,7 +75,7 @@ def list_stored_procedures(graph_id): # noqa: E501 :rtype: Union[List[GetStoredProcResponse], Tuple[List[GetStoredProcResponse], int], Tuple[List[GetStoredProcResponse], int, Dict[str, str]] """ - return client_wrapper.list_stored_procedures(graph_id) + return get_client_wrapper().list_stored_procedures(graph_id) @handle_api_exception() @@ -95,4 +95,4 @@ def update_stored_procedure_by_id(graph_id, stored_procedure_id, update_stored_p """ if connexion.request.is_json: update_stored_proc_request = UpdateStoredProcRequest.from_dict(connexion.request.get_json()) # noqa: E501 - return client_wrapper.update_stored_procedure_by_id(graph_id, stored_procedure_id, update_stored_proc_request) + return get_client_wrapper().update_stored_procedure_by_id(graph_id, stored_procedure_id, update_stored_proc_request) diff --git a/coordinator/gscoordinator/flex/controllers/utils_controller.py b/coordinator/gscoordinator/flex/controllers/utils_controller.py index 731bdb5f8645..0f80be8ea4f3 100644 --- a/coordinator/gscoordinator/flex/controllers/utils_controller.py +++ b/coordinator/gscoordinator/flex/controllers/utils_controller.py @@ -7,7 +7,7 @@ from gscoordinator.flex.models.upload_file_response import UploadFileResponse # noqa: E501 from gscoordinator.flex import util -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core import handle_api_exception @@ -22,4 +22,4 @@ def upload_file(filestorage=None): # noqa: E501 :rtype: Union[UploadFileResponse, Tuple[UploadFileResponse, int], Tuple[UploadFileResponse, int, Dict[str, str]] """ - return client_wrapper.upload_file(filestorage) + return get_client_wrapper().upload_file(filestorage) diff --git a/coordinator/gscoordinator/flex/core/__init__.py b/coordinator/gscoordinator/flex/core/__init__.py index 606078a84fba..11501683b369 100644 --- a/coordinator/gscoordinator/flex/core/__init__.py +++ b/coordinator/gscoordinator/flex/core/__init__.py @@ -21,5 +21,6 @@ # Disable warnings warnings.filterwarnings("ignore", category=Warning) -from gscoordinator.flex.core.client_wrapper import client_wrapper # noqa: F401, E402 +from gscoordinator.flex.core.client_wrapper import \ + get_client_wrapper # noqa: F401, E402 from gscoordinator.flex.core.utils import handle_api_exception # noqa: F401, E402 diff --git a/coordinator/gscoordinator/flex/core/alert/builtin_rules.py b/coordinator/gscoordinator/flex/core/alert/builtin_rules.py index fb22b9478b3c..1344e4186fb8 100644 --- a/coordinator/gscoordinator/flex/core/alert/builtin_rules.py +++ b/coordinator/gscoordinator/flex/core/alert/builtin_rules.py @@ -23,7 +23,7 @@ import psutil from gremlin_python.driver.client import Client -from gscoordinator.flex.core import client_wrapper +from gscoordinator.flex.core import get_client_wrapper from gscoordinator.flex.core.alert.alert_rule import AlertRule from gscoordinator.flex.core.alert.message_collector import AlertMessageCollector from gscoordinator.flex.core.config import CLUSTER_TYPE @@ -65,7 +65,7 @@ def run_alert(self): try: alert_nodes = [] disk_usages = [] - disk_utils = client_wrapper.get_storage_usage().to_dict() + disk_utils = get_client_wrapper().get_storage_usage().to_dict() for node, usage in disk_utils["storage_usage"].items(): if float(usage) > self._threshold: alert_nodes.append(node) @@ -107,7 +107,7 @@ def __init__( def run_alert(self): """This function needs to handle exception by itself""" try: - available = client_wrapper.gremlin_service_available() + available = get_client_wrapper().gremlin_service_available() if not available: message = f"Gremlin service unavailable: unknown reason" except Exception as e: diff --git a/coordinator/gscoordinator/flex/core/client_wrapper.py b/coordinator/gscoordinator/flex/core/client_wrapper.py index 7d39bd03084b..d16c41da702f 100644 --- a/coordinator/gscoordinator/flex/core/client_wrapper.py +++ b/coordinator/gscoordinator/flex/core/client_wrapper.py @@ -22,6 +22,8 @@ import threading from typing import List +from graphscope.config import Config + from gscoordinator.flex.core.config import CLUSTER_TYPE from gscoordinator.flex.core.config import DATASET_WORKSPACE from gscoordinator.flex.core.config import SOLUTION @@ -61,17 +63,17 @@ class ClientWrapper(object): """Wrapper of client that interacts with engine""" - def __init__(self): + def __init__(self, config: Config): # lock to protect the service self._lock = threading.RLock() # initialize specific client - self._client = self._initialize_client() + self._client = self._initialize_client(config) # data source management self._datasource_manager = DataSourceManager() # deployment self._deployment = initialize_deployemnt() - def _initialize_client(self): + def _initialize_client(self, config: Config): service_initializer = { "INTERACTIVE": init_hqps_client, "GRAPHSCOPE_INSIGHT": init_groot_client, @@ -80,7 +82,7 @@ def _initialize_client(self): if initializer is None: logger.warn(f"Client initializer of {SOLUTION} not found.") return None - return initializer() + return initializer(config) def list_graphs(self) -> List[GetGraphResponse]: graphs = self._client.list_graphs() @@ -368,4 +370,12 @@ def gremlin_service_available(self) -> bool: return self._client.gremlin_service_available() -client_wrapper = ClientWrapper() +client_wrapper = None + +# Interactive/Insight specific configuration +def initialize_client_wrapper(config=None): + global client_wrapper + client_wrapper = ClientWrapper(config) + +def get_client_wrapper(): + return client_wrapper diff --git a/coordinator/gscoordinator/flex/core/insight/groot.py b/coordinator/gscoordinator/flex/core/insight/groot.py index a5aaa6738daa..a1b7dede86f7 100644 --- a/coordinator/gscoordinator/flex/core/insight/groot.py +++ b/coordinator/gscoordinator/flex/core/insight/groot.py @@ -26,6 +26,7 @@ from typing import List import psutil +from graphscope.config import Config from gremlin_python.driver.client import Client from gscoordinator.flex.core.config import CLUSTER_TYPE @@ -307,5 +308,5 @@ def gremlin_service_available(self) -> bool: return True -def init_groot_client(): +def init_groot_client(config: Config): return GrootClient() diff --git a/coordinator/gscoordinator/flex/core/insight/job.py b/coordinator/gscoordinator/flex/core/insight/job.py index dad498e78852..c5eeeb121b1a 100644 --- a/coordinator/gscoordinator/flex/core/insight/job.py +++ b/coordinator/gscoordinator/flex/core/insight/job.py @@ -19,6 +19,7 @@ import datetime import http.client import json +import logging import time import urllib.parse @@ -35,6 +36,7 @@ from gscoordinator.flex.core.utils import encode_datetime from gscoordinator.flex.models import JobStatus +logger = logging.getLogger("graphscope") class FetchDataloadingJobStatus(object): def __init__(self, graph, status: JobStatus): @@ -307,12 +309,14 @@ def run(self): json.dumps(configini), headers={"Content-type": "application/json"}, ) - r = conn.getresponse() - if r.status > 400 and r.status < 600: + resp = conn.getresponse() + data = resp.read().decode("utf-8") + if resp.status != 200: + logger.error("Failed to submit dataloading job, code: ", resp.status, ", data: ", data) raise RuntimeError( - "Failed to submit dataloading job: " + r.read().decode("utf-8") + "Failed to submit dataloading job, code: ", resp.status, ", data: ", data ) - rlt = json.loads(r.read().decode("utf-8")) + rlt = json.loads(data) if rlt["success"]: self._jobid = rlt["data"] status = self.generate_job_status( @@ -325,6 +329,7 @@ def run(self): log=rlt["message"], ) except Exception as e: + logger.error("Exception occured: ", str(e)) status = self.generate_job_status( status="FAILED", end_time=datetime.datetime.now(), log=str(e) ) diff --git a/coordinator/gscoordinator/flex/core/interactive/hqps.py b/coordinator/gscoordinator/flex/core/interactive/hqps.py index 202df40a1436..0a601f562988 100644 --- a/coordinator/gscoordinator/flex/core/interactive/hqps.py +++ b/coordinator/gscoordinator/flex/core/interactive/hqps.py @@ -28,6 +28,7 @@ import gs_interactive import psutil import requests +from graphscope.config import Config from gs_interactive.models.create_graph_request import CreateGraphRequest from gs_interactive.models.create_procedure_request import CreateProcedureRequest from gs_interactive.models.schema_mapping import SchemaMapping @@ -49,9 +50,15 @@ class HQPSClient(object): """Class used to interact with hqps engine""" - def __init__(self): + def __init__(self, config: Config): # hqps admin service endpoint self._hqps_endpoint = self._get_hqps_service_endpoints() + self._port_mapping = config.interactive.port_mapping + + def _get_mapped_port(self, port: int) -> int: + if self._port_mapping and port in self._port_mapping: + return self._port_mapping[port] + return port def _get_hqps_service_endpoints(self): if CLUSTER_TYPE == "HOSTS": @@ -229,7 +236,7 @@ def list_service_status(self) -> List[dict]: api_instance = gs_interactive.AdminServiceServiceManagementApi(api_client) response = api_instance.get_service_status() if CLUSTER_TYPE == "HOSTS": - host = get_internal_ip() + host = '127.0.0.1' # for interactive deployed in hosts, we could not determine the public ip in container. So we let user to replace with the public ip. if response.status == "Running" and response.graph is not None: g = response.graph.to_dict() serving_graph_id = g["id"] @@ -239,10 +246,11 @@ def list_service_status(self) -> List[dict]: status = { "status": response.status, "sdk_endpoints": { - "cypher": f"neo4j://{host}:{response.bolt_port} (internal)", - "hqps": f"http://{host}:{response.hqps_port} (internal)", - "gremlin": f"ws://{host}:{response.gremlin_port}/gremlin (internal)", + "cypher": f"neo4j://{host}:{self._get_mapped_port(response.bolt_port)}", + "hqps": f"http://{host}:{self._get_mapped_port(response.hqps_port)}", + "gremlin": f"ws://{host}:{self._get_mapped_port(response.gremlin_port)}/gremlin", }, + "info": "Replace 127.0.0.1 with public ip if connecting from outside", "start_time": service_start_time, "graph_id": g["id"], } @@ -372,5 +380,5 @@ def import_schema(self, graph_id, schema: dict): raise RuntimeError("Method is not supported.") -def init_hqps_client(): - return HQPSClient() +def init_hqps_client(config: Config): + return HQPSClient(config) diff --git a/coordinator/requirements.txt b/coordinator/requirements.txt index 9d6de36eccd4..3a68eb7db874 100644 --- a/coordinator/requirements.txt +++ b/coordinator/requirements.txt @@ -14,6 +14,7 @@ werkzeug == 3.0.3; python_version=="3.5" or python_version=="3.4" swagger-ui-bundle >= 0.0.2 python_dateutil >= 2.6.0 Flask == 2.2.5 +Flask-Cors == 5.0.0 urllib3 >= 1.25.3, < 2.1.0 pydantic >= 2 typing-extensions >= 4.7.1 diff --git a/docs/flex/interactive/data_import.md b/docs/flex/interactive/data_import.md index 7b7b454b5aa6..90d71eb350bc 100644 --- a/docs/flex/interactive/data_import.md +++ b/docs/flex/interactive/data_import.md @@ -227,7 +227,7 @@ The table below offers a detailed breakdown of each configuration item. In this | loading_config.scheme | file | The source of input data. Currently only `file` and `odps` are supported | No | | loading_config.format | N/A | The format of the raw data in CSV | Yes | | loading_config.format.metadata | N/A | Mainly for configuring the options for reading CSV | Yes | -| loading_config.format.metadata.delimiter | '\|' | Delimiter used to split a row of data | Yes | +| loading_config.format.metadata.delimiter | '|' | Delimiter used to split a row of data, escaped char are also supported, i.e. '\t' | Yes | | loading_config.format.metadata.header_row | true | Indicate if the first row should be used as the header | No | | loading_config.format.metadata.quoting | false | Whether quoting is used | No | | loading_config.format.metadata.quote_char | '\"' | Quoting character (if `quoting` is true) | No | diff --git a/docs/interactive_engine/tinkerpop/faq.md b/docs/interactive_engine/tinkerpop/faq.md index 1e6d59791425..dcad710220b4 100644 --- a/docs/interactive_engine/tinkerpop/faq.md +++ b/docs/interactive_engine/tinkerpop/faq.md @@ -1,28 +1,31 @@ # FAQs for GIE Gremlin Usage ## Compatibility with TinkerPop + GIE supports the property graph model and Gremlin traversal language defined by Apache TinkerPop, and provides a Gremlin WebSockets server that supports TinkerPop version 3.4. In addition to the original Gremlin queries, we further introduce some syntactic sugars to allow more succinct expression. However, because of the distributed nature and practical considerations, it is worth to notice the following limitations of our implementations of Gremlin. - Functionalities + - Graph mutations. - Lambda and Groovy expressions and functions, such as the `.map{}`, the `.by{}`, and the `.filter{}` functions, and `System.currentTimeMillis()`, etc. By the way, we have provided the `expr()` [syntactic sugar](../interactive_engine/supported_gremlin_steps.md) to handle complex expressions. - Gremlin traversal strategies. - Transactions. - Secondary index isn’t currently available. Primary keys will be automatically indexed. - - Gremlin Steps: See [here](supported_gremlin_steps.md) for a complete supported/unsupported list of Gremlin. ## Property Graph Constraints + The current release of GIE supports two graph stores: one leverages [Vineyard](https://v6d.io/) to supply an in-memory store for immutable graph data, and the other, called [groot](../storage_engine/groot.md), is developed on top of [RocksDB](https://rocksdb.org/) that also provides real-time write and data consistency via [snapshot isolation](https://en.wikipedia.org/wiki/Snapshot_isolation). Both stores support graph data being partitioned across multiple servers. By design, the following constraints are introduced (on both stores): - - Each graph has a schema comprised of the edge labels, property keys, and vertex labels used therein. - - Each vertex type or label has a primary key (property) defined by user. The system will automatically + +- Each graph has a schema comprised of the edge labels, property keys, and vertex labels used therein. +- Each vertex type or label has a primary key (property) defined by user. The system will automatically generate a String-typed unique identifier for each vertex and edge, encoding both the label information as well as user-defined primary keys (for vertex). - - Each vertex or edge property can be of the following data types: `int`, `long`, `float`, `double`, +- Each vertex or edge property can be of the following data types: `int`, `long`, `float`, `double`, `String`, `List`, `List`, and `List`. ## What's the difference between Inner ID and Property ID ? @@ -32,6 +35,7 @@ The main difference between Inner ID and Property ID is that Inner ID is a syste For example, in the LDBC (Linked Data Benchmark Council) schema, we have an entity type called 'PERSON', which has its own list of properties, consisting of 'id', 'name' and 'birthday'. In the actual storage, we maintain key-value pairs for each instance of entity type 'PERSON', and internally maintain a unique ID to differentiate each such instance. The unique ID in this context is referred to as the Inner ID, and the 'id' in the attribute list is the Property ID. GIE Gremlin provides different approaches to query a vertex instance by its Inner ID or Property ID, similar to: + ```scss // by its inner id g.V(123456) @@ -45,6 +49,7 @@ In the above case, the vertex may have a property `id` with value 1, which is ma unique inner id `123456`. For edges, we do not currently provide any approaches to query based on Inner ID, for two reasons: + - Firstly, Inner ID is internally maintained by the system and should not be exposed to users by default. - Secondly, a single edge instance may not be uniquely identified by Inner ID alone, as it typically requires a triplet such as \. @@ -53,23 +58,29 @@ For edges, we do not currently provide any approaches to query based on Inner ID With path_expand, users can define their desired path pattern concretely and further define corresponding characteristics based on that path. For example, if an entity of type 'PERSON' wants to find instances that can be reached by 3-hops, in traditional Gremlin, it can only be represented as ```g.V().hasLabel('PERSON').both().both().both()```. With path_expand, it can be represented more concisely as ```g.V().hasLabel('PERSON').both('3..4').endV()```, where ```both('3..4')``` represents the path pattern, and ```'3..4'``` specifies the range of hops as [3, 4). We can further define characteristics of the path pattern using path_expand. For example, if an entity of type 'PERSON' wants to find instances that can be reached by 3-hops, while ensuring that the path is a simple path (no repeated vertices or edges), it can be represented as: + ```scss g.V().hasLabel('PERSON').both('3..4').with('PATH_OPT', 'SIMPLE').endV() ``` -You can refer to [PathExpand](https://github.com/alibaba/GraphScope/blob/main/docs/interactive_engine/supported_gremlin_steps.md#pathexpand) for more examples and usage of path_expand. +You can refer to [PathExpand](https://github.com/alibaba/GraphScope/blob/main/docs/interactive_engine/tinkerpop/supported_gremlin_steps.md#pathexpand) for more examples and usage of path_expand. ## How to filter data in GIE Gremlin like SQL ? With ```expr```, We can support SQL-like expressions in GIE Gremlin. For example, if we want to find all 'PERSON' instances with either the name 'marko' or the age '27', we can represent it as follows: + ```scss g.V().hasLabel('PERSON').where(expr('@.name=\"marko\" || @.age = 27')) ``` + In traditional Gremlin, it can only be represented as follows: + ```scss g.V().hasLabel('PERSON').has('name', 'marko').or().has('age', 27) ``` + It is equivalent to the following SQL-like expression: + ```scss SELECT * FROM PERSON @@ -77,19 +88,24 @@ WHERE name = 'marko' OR age = 27; ``` Traditional Gremlin uses the ```HasStep``` operator to support filter queries, which has some limitations compared to the ```Where``` operator in SQL: + - ```HasStep``` can only express query filters based on the current vertex or edge and their properties, without the ability to cross multiple vertices or edges. - On the other hand, ```HasStep``` in Gremlin for complex expressions may not be as intuitive as in SQL. -We have addressed the limitations and shortcomings of Gremlin in filter expression by using ```expr```, for more usage, please refer to [Expression](https://github.com/alibaba/GraphScope/blob/main/docs/interactive_engine/supported_gremlin_steps.md#expression). +We have addressed the limitations and shortcomings of Gremlin in filter expression by using ```expr```, for more usage, please refer to [Expression](https://github.com/alibaba/GraphScope/blob/main/docs/interactive_engine/tinkerpop/supported_gremlin_steps.md#expression). ## How to aggregate data in GIE Gremlin like SQL? We further extend the ```group``` operator in Gremlin to support multi-column grouping operations, similar to those in SQL. + ### group by multiple keys + ```scss g.V().hasLabel('PERSON').groupCount().by('name', 'age') ``` + which is equivalent to: + ```scss SELECT PERSON.name, @@ -101,7 +117,9 @@ GROUP BY PERSON.name, PERSON.age ``` + ### group by multiple values: + ```scss g.V() .hasLabel('PERSON') @@ -109,7 +127,9 @@ g.V() .by('name') .by(count('age').as('age_cnt'), sum('age').as('age_sum')) ``` + which is equivalent to : + ```scss SELECT PERSON.name, @@ -120,20 +140,25 @@ FROM GROUP BY name ``` -Please refer to [Aggregate](https://github.com/alibaba/GraphScope/blob/main/docs/interactive_engine/supported_gremlin_steps.md#aggregate-group) for more usage. + +Please refer to [Aggregate](https://github.com/alibaba/GraphScope/blob/main/docs/interactive_engine/tinkerpop/supported_gremlin_steps.md#aggregate-group) for more usage. ## How to optimize Gremlin queries for performance in GIE? ### Use appropriate indexing + GIE supports various indexing options such as vertex label index, primary key index, and edge label index. Properly defining and using indexes can significantly improve query performance. For example, in the LDBC schema, we define the property ID as the primary key for the entity type 'PERSON' and maintain the corresponding primary key index in the storage. This allows us to directly index specific 'PERSON' instances using \, without scanning all vertices and filtering based on property key-value. This can be expressed in a Gremlin query as follows: + ```scss g.V().hasLabel('PERSON').has('id', propertyIdValue) ``` -Where 'id' is the property ID, and 'propertyIdValue' is the value of the property key. + +Where 'id' is the property ID, and 'propertyIdValue' is the value of the property key. Moreover, we support the `within` operator to query multiple values of the same property key, which can also be optimized by the primary key index. For example: + ```scss g.V().hasLabel('PERSON').has('id', within(propertyIdValue1, propertyIdValue2)) ``` @@ -149,4 +174,15 @@ Therefore, You can only perform subgraph operations after edge-output operators ```scss g.V().outE().limit(10).subgraph('sub_graph').count() ``` -Please refer to [Subgraph](https://github.com/alibaba/GraphScope/blob/main/docs/interactive_engine/supported_gremlin_steps.md#subgraph) for more usage. + +Please refer to [Subgraph](https://github.com/alibaba/GraphScope/blob/main/docs/interactive_engine/tinkerpop/supported_gremlin_steps.md#subgraph) for more usage. + +## Suggestions About Parallism Settings for Queries + +We support per-query settings for query parallelism, using the syntax `g.with('pegasus.worker.num', $worker_num)`. The maximum parallelism is bound by the number of machine cores. In our engine’s scheduling, we employ a first-come, first-served strategy. + +If your query workloads is high-QPS, consisting of numerous small queries such as querying attributes or neighbor points from a single source vertex, we recommend lowering `worker_num` to 1. This adjustment allows the engine to allocate sufficient workers to handle these various small queries concurrently. + +Conversely, if you are dealing with costly large queries that start from large number of source vertices and involve complex operations, we suggest increasing `worker_num` (e.g., 16 or 32). This way, the engine can enhance query parallelism and improve overall query efficiency for each query. + +In scenarios where both small and large queries run concurrently, it is advisable to assign a reasonable number of workers to large queries (ensuring that they do not occupy all available workers). This approach helps prevent small queries from being blocked by large queries, which could monopolize all workers and result in high latency for smaller queries. diff --git a/docs/overview/getting_started.md b/docs/overview/getting_started.md index 366ad7bf2f0d..07f1572cf880 100644 --- a/docs/overview/getting_started.md +++ b/docs/overview/getting_started.md @@ -80,7 +80,7 @@ In this example, we use graph traversal to count the number of papers two given ```python # get the endpoint for submitting interactive queries on graph g. -interactive = graphscope.interactive(g) +interactive = graphscope.interactive(g, with_cypher=True) # Gremlin query for counting the number of papers two authors (with id 2 and 4307) have co-authored papers = interactive.execute("g.V().has('author', 'id', 2).out('writes').where(__.in('writes').has('id', 4307)).count()").one() @@ -261,7 +261,7 @@ gs.set_option(show_log=True) graph = load_modern_graph() # Hereafter, you can use the `graph` object to create an `interactive` query session, which will start one Gremlin service and one Cypher service simultaneously on the backend. -g = gs.interactive(graph) +g = gs.interactive(graph, with_cypher=True) # then `execute` any supported gremlin query. q1 = g.execute('g.V().count()') print(q1.all().result()) # should print [6] @@ -270,7 +270,7 @@ q2 = g.execute('g.V().hasLabel(\'person\')') print(q2.all().result()) # should print [[v[2], v[3], v[0], v[1]]] # or `execute` any supported Cypher query -q3 = g.execute("MATCH (n:person) RETURN count(n)", lang="cypher", routing_=RoutingControl.READ) +q3 = g.execute("MATCH (n:person) RETURN count(n)", lang="cypher") print(q3.records[0][0]) # should print 6 ``` ```` diff --git a/flex/engines/graph_db/database/graph_db.cc b/flex/engines/graph_db/database/graph_db.cc index 1b8fff41a8f6..55601315c43b 100644 --- a/flex/engines/graph_db/database/graph_db.cc +++ b/flex/engines/graph_db/database/graph_db.cc @@ -305,7 +305,12 @@ const Schema& GraphDB::schema() const { return graph_.schema(); } std::shared_ptr GraphDB::get_vertex_property_column( uint8_t label, const std::string& col_name) const { - return graph_.get_vertex_table(label).get_column(col_name); + return graph_.get_vertex_property_column(label, col_name); +} + +std::shared_ptr GraphDB::get_vertex_id_column( + uint8_t label) const { + return graph_.get_vertex_id_column(label); } AppWrapper GraphDB::CreateApp(uint8_t app_type, int thread_id) { diff --git a/flex/engines/graph_db/database/graph_db.h b/flex/engines/graph_db/database/graph_db.h index d345838f7be3..da24423b16d9 100644 --- a/flex/engines/graph_db/database/graph_db.h +++ b/flex/engines/graph_db/database/graph_db.h @@ -137,6 +137,8 @@ class GraphDB { std::shared_ptr get_vertex_property_column( uint8_t label, const std::string& col_name) const; + std::shared_ptr get_vertex_id_column(uint8_t label) const; + AppWrapper CreateApp(uint8_t app_type, int thread_id); void GetAppInfo(Encoder& result); diff --git a/flex/engines/graph_db/database/graph_db_session.cc b/flex/engines/graph_db/database/graph_db_session.cc index 8173fa65f5f8..ed128bf32605 100644 --- a/flex/engines/graph_db/database/graph_db_session.cc +++ b/flex/engines/graph_db/database/graph_db_session.cc @@ -79,33 +79,7 @@ std::shared_ptr GraphDBSession::get_vertex_property_column( std::shared_ptr GraphDBSession::get_vertex_id_column( uint8_t label) const { - if (db_.graph().lf_indexers_[label].get_type() == PropertyType::kInt64) { - return std::make_shared>( - dynamic_cast&>( - db_.graph().lf_indexers_[label].get_keys())); - } else if (db_.graph().lf_indexers_[label].get_type() == - PropertyType::kInt32) { - return std::make_shared>( - dynamic_cast&>( - db_.graph().lf_indexers_[label].get_keys())); - } else if (db_.graph().lf_indexers_[label].get_type() == - PropertyType::kUInt64) { - return std::make_shared>( - dynamic_cast&>( - db_.graph().lf_indexers_[label].get_keys())); - } else if (db_.graph().lf_indexers_[label].get_type() == - PropertyType::kUInt32) { - return std::make_shared>( - dynamic_cast&>( - db_.graph().lf_indexers_[label].get_keys())); - } else if (db_.graph().lf_indexers_[label].get_type() == - PropertyType::kStringView) { - return std::make_shared>( - dynamic_cast&>( - db_.graph().lf_indexers_[label].get_keys())); - } else { - return nullptr; - } + return db_.get_vertex_id_column(label); } Result> GraphDBSession::Eval(const std::string& input) { diff --git a/flex/engines/graph_db/database/read_transaction.h b/flex/engines/graph_db/database/read_transaction.h index 23b93acf3fe3..ef352a6ca903 100644 --- a/flex/engines/graph_db/database/read_transaction.h +++ b/flex/engines/graph_db/database/read_transaction.h @@ -290,11 +290,41 @@ class ReadTransaction { const MutablePropertyFragment& graph() const; + /* + * @brief Get the handle of the vertex property column, only for non-primary + * key columns. + */ const std::shared_ptr get_vertex_property_column( uint8_t label, const std::string& col_name) const { return graph_.get_vertex_table(label).get_column(col_name); } + /** + * @brief Get the handle of the vertex property column, including the primary + * key. + * @tparam T The type of the column. + * @param label The label of the vertex. + * @param col_name The name of the column. + */ + template + const std::shared_ptr> get_vertex_ref_property_column( + uint8_t label, const std::string& col_name) const { + auto pk = graph_.schema().get_vertex_primary_key(label); + CHECK(pk.size() == 1) << "Only support single primary key"; + if (col_name == std::get<1>(pk[0])) { + return std::dynamic_pointer_cast>( + graph_.get_vertex_id_column(label)); + } else { + auto ptr = graph_.get_vertex_table(label).get_column(col_name); + if (ptr) { + return std::dynamic_pointer_cast>( + CreateRefColumn(ptr)); + } else { + return nullptr; + } + } + } + class vertex_iterator { public: vertex_iterator(label_t label, vid_t cur, vid_t num, diff --git a/flex/engines/graph_db/runtime/adhoc/var.cc b/flex/engines/graph_db/runtime/adhoc/var.cc index 3f581aa80d4f..b8cc6e7161bc 100644 --- a/flex/engines/graph_db/runtime/adhoc/var.cc +++ b/flex/engines/graph_db/runtime/adhoc/var.cc @@ -56,25 +56,9 @@ Var::Var(const ReadTransaction& txn, const Context& ctx, if (pt.has_id()) { getter_ = std::make_shared(ctx, tag); } else if (pt.has_key()) { - if (pt.key().name() == "id") { - if (type_ == RTAnyType::kStringValue) { - getter_ = - std::make_shared>( - txn, ctx, tag); - } else if (type_ == RTAnyType::kI32Value) { - getter_ = std::make_shared>( - txn, ctx, tag); - } else if (type_ == RTAnyType::kI64Value) { - getter_ = std::make_shared>( - txn, ctx, tag); - } else { - LOG(FATAL) << "not support for " - << static_cast(type_.type_enum_); - } - } else { - getter_ = create_vertex_property_path_accessor(txn, ctx, tag, type_, - pt.key().name()); - } + getter_ = create_vertex_property_path_accessor(txn, ctx, tag, type_, + pt.key().name()); + } else if (pt.has_label()) { getter_ = create_vertex_label_path_accessor(ctx, tag); } else { @@ -126,23 +110,8 @@ Var::Var(const ReadTransaction& txn, const Context& ctx, if (pt.has_id()) { getter_ = std::make_shared(); } else if (pt.has_key()) { - if (pt.key().name() == "id") { - if (type_ == RTAnyType::kStringValue) { - getter_ = - std::make_shared>( - txn); - } else if (type_ == RTAnyType::kI32Value) { - getter_ = std::make_shared>(txn); - } else if (type_ == RTAnyType::kI64Value) { - getter_ = std::make_shared>(txn); - } else { - LOG(FATAL) << "not support for " - << static_cast(type_.type_enum_); - } - } else { - getter_ = create_vertex_property_vertex_accessor(txn, type_, - pt.key().name()); - } + getter_ = create_vertex_property_vertex_accessor(txn, type_, + pt.key().name()); } else if (pt.has_label()) { getter_ = std::make_shared(); } else { diff --git a/flex/engines/graph_db/runtime/common/accessors.h b/flex/engines/graph_db/runtime/common/accessors.h index 33a468a7155d..67f5e94534f4 100644 --- a/flex/engines/graph_db/runtime/common/accessors.h +++ b/flex/engines/graph_db/runtime/common/accessors.h @@ -156,9 +156,8 @@ class VertexPropertyPathAccessor : public IAccessor { int label_num = txn.schema().vertex_label_num(); property_columns_.resize(label_num, nullptr); for (int i = 0; i < label_num; ++i) { - property_columns_[i] = dynamic_cast*>( - txn.get_vertex_property_column(static_cast(i), prop_name) - .get()); + property_columns_[i] = txn.template get_vertex_ref_property_column( + static_cast(i), prop_name); } } @@ -205,7 +204,7 @@ class VertexPropertyPathAccessor : public IAccessor { private: const IVertexColumn& vertex_col_; - std::vector*> property_columns_; + std::vector>> property_columns_; }; class VertexLabelPathAccessor : public IAccessor { @@ -323,9 +322,8 @@ class VertexPropertyVertexAccessor : public IAccessor { int label_num = txn.schema().vertex_label_num(); property_columns_.resize(label_num, nullptr); for (int i = 0; i < label_num; ++i) { - property_columns_[i] = dynamic_cast*>( - txn.get_vertex_property_column(static_cast(i), prop_name) - .get()); + property_columns_[i] = txn.template get_vertex_ref_property_column( + static_cast(i), prop_name); } } @@ -366,7 +364,7 @@ class VertexPropertyVertexAccessor : public IAccessor { } private: - std::vector*> property_columns_; + std::vector>> property_columns_; }; class EdgeIdPathAccessor : public IAccessor { diff --git a/flex/engines/graph_db/runtime/common/columns/vertex_columns.h b/flex/engines/graph_db/runtime/common/columns/vertex_columns.h index 108ac48d5e5e..b984e3bd9638 100644 --- a/flex/engines/graph_db/runtime/common/columns/vertex_columns.h +++ b/flex/engines/graph_db/runtime/common/columns/vertex_columns.h @@ -215,6 +215,8 @@ class OptionalSLVertexColumn : public IVertexColumn { ISigColumn* generate_signature() const override; + label_t label() const { return label_; } + private: friend class OptionalSLVertexColumnBuilder; label_t label_; diff --git a/flex/interactive/sdk/generate_sdk.sh b/flex/interactive/sdk/generate_sdk.sh index 4d529e0cef28..e38c33bc9920 100755 --- a/flex/interactive/sdk/generate_sdk.sh +++ b/flex/interactive/sdk/generate_sdk.sh @@ -30,7 +30,7 @@ DEVELOPER_NAME="GraphScope Team" LICENSE_NAME="Apache-2.0" LICENSE_URL="https://www.apache.org/licenses/LICENSE-2.0.html" LOG_LEVEL="error" - +export OPENAPI_GENERATOR_VERSION=7.2.0 #get current bash scrip's directory CUR_DIR=$(cd `dirname $0`; pwd) @@ -129,7 +129,6 @@ function install_generator() { curl https://raw.githubusercontent.com/OpenAPITools/openapi-generator/master/bin/utils/openapi-generator-cli.sh > ~/bin/openapitools/openapi-generator-cli chmod u+x ~/bin/openapitools/openapi-generator-cli export PATH=$PATH:~/bin/openapitools/ - export OPENAPI_GENERATOR_VERSION=7.2.0 fi # on ubuntu apt-get jq on mac brew install jq diff --git a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py index 6282fc4a393b..94f0b34a25f2 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/conftest.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/conftest.py @@ -334,6 +334,20 @@ def create_partial_modern_graph(interactive_session): delete_running_graph(interactive_session, graph_id) +@pytest.fixture(scope="function") +def create_graph_with_custom_pk_name(interactive_session): + modern_graph_custom_pk_name = modern_graph_full.copy() + for vertex_type in modern_graph_custom_pk_name["schema"]["vertex_types"]: + vertex_type["properties"][0]["property_name"] = "custom_id" + vertex_type["primary_keys"] = ["custom_id"] + create_graph_request = CreateGraphRequest.from_dict(modern_graph_custom_pk_name) + resp = interactive_session.create_graph(create_graph_request) + assert resp.is_ok() + graph_id = resp.get_value().graph_id + yield graph_id + delete_running_graph(interactive_session, graph_id) + + def wait_job_finish(sess: Session, job_id: str): assert job_id is not None while True: @@ -463,5 +477,5 @@ def update_procedure(sess: Session, graph_id: str, proc_id: str, desc: str): def start_service_on_graph(interactive_session, graph_id: str): resp = interactive_session.start_service(StartServiceRequest(graph_id=graph_id)) assert resp.is_ok() - # wait one second to let compiler get the new graph - time.sleep(1) + # wait three second to let compiler get the new graph + time.sleep(3) diff --git a/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py b/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py index eba1763b8975..a144a1e07948 100644 --- a/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py +++ b/flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py @@ -18,6 +18,7 @@ import os import sys +from time import sleep import pytest @@ -265,3 +266,27 @@ def test_call_proc_in_cypher(interactive_session, neo4j_session, create_modern_g for record in result: cnt += 1 assert cnt == 8 + + +def test_custom_pk_name( + interactive_session, neo4j_session, create_graph_with_custom_pk_name +): + print("[Test custom pk name]") + import_data_to_full_modern_graph( + interactive_session, create_graph_with_custom_pk_name + ) + start_service_on_graph(interactive_session, create_graph_with_custom_pk_name) + result = neo4j_session.run( + "MATCH (n: person) where n.custom_id = 4 return n.custom_id;" + ) + records = result.fetch(10) + for record in records: + print(record) + assert len(records) == 1 + + result = neo4j_session.run( + "MATCH (n:person)-[e]-(v:person) where v.custom_id = 1 return count(e);" + ) + records = result.fetch(1) + assert len(records) == 1 and records[0]["$f0"] == 2 + start_service_on_graph(interactive_session, "1") diff --git a/flex/openapi/openapi_coordinator.yaml b/flex/openapi/openapi_coordinator.yaml index 3927f61f1306..a3029de3c5cc 100644 --- a/flex/openapi/openapi_coordinator.yaml +++ b/flex/openapi/openapi_coordinator.yaml @@ -250,6 +250,9 @@ components: type: string grpc: type: string + info: + type: string + nullable: true start_time: type: string @@ -2196,6 +2199,7 @@ paths: cypher: neo4j://mock.api.cypher:7676 gremlin: ws://mock.api.gremlin/gremlin hqps: http://mock.api.hqps:10000 + info: "Replace 127.0.0.1 with public ip if connecting from outside" start_time: "2024-01-01 00:00:00" 500: $ref: "#/components/responses/500" diff --git a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc index ef2f9412b0dc..682a6a460e3b 100644 --- a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc +++ b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc @@ -158,10 +158,25 @@ static std::vector read_header( static void put_delimiter_option(const LoadingConfig& loading_config, arrow::csv::ParseOptions& parse_options) { auto delimiter_str = loading_config.GetDelimiter(); - if (delimiter_str.size() != 1) { - LOG(FATAL) << "Delimiter should be a single character"; + if (delimiter_str.size() != 1 && delimiter_str[0] != '\\') { + LOG(FATAL) << "Delimiter should be a single character, or a escape " + "character, like '\\t'"; + } + if (delimiter_str[0] == '\\') { + if (delimiter_str.size() != 2) { + LOG(FATAL) << "Delimiter should be a single character"; + } + // escape the special character + switch (delimiter_str[1]) { + case 't': + parse_options.delimiter = '\t'; + break; + default: + LOG(FATAL) << "Unsupported escape character: " << delimiter_str[1]; + } + } else { + parse_options.delimiter = delimiter_str[0]; } - parse_options.delimiter = delimiter_str[0]; } static bool put_skip_rows_option(const LoadingConfig& loading_config, diff --git a/flex/storages/rt_mutable_graph/mutable_property_fragment.cc b/flex/storages/rt_mutable_graph/mutable_property_fragment.cc index 1cb2c329efd5..22f736fff8a7 100644 --- a/flex/storages/rt_mutable_graph/mutable_property_fragment.cc +++ b/flex/storages/rt_mutable_graph/mutable_property_fragment.cc @@ -496,4 +496,38 @@ const CsrBase* MutablePropertyFragment::get_ie_csr(label_t label, return ie_[index]; } +std::shared_ptr MutablePropertyFragment::get_vertex_property_column( + uint8_t label, const std::string& prop) const { + return vertex_data_[label].get_column(prop); +} + +std::shared_ptr MutablePropertyFragment::get_vertex_id_column( + uint8_t label) const { + if (lf_indexers_[label].get_type() == PropertyType::kInt64) { + return std::make_shared>( + dynamic_cast&>( + lf_indexers_[label].get_keys())); + } else if (lf_indexers_[label].get_type() == PropertyType::kInt32) { + return std::make_shared>( + dynamic_cast&>( + lf_indexers_[label].get_keys())); + } else if (lf_indexers_[label].get_type() == PropertyType::kUInt64) { + return std::make_shared>( + dynamic_cast&>( + lf_indexers_[label].get_keys())); + } else if (lf_indexers_[label].get_type() == PropertyType::kUInt32) { + return std::make_shared>( + dynamic_cast&>( + lf_indexers_[label].get_keys())); + } else if (lf_indexers_[label].get_type() == PropertyType::kStringView) { + return std::make_shared>( + dynamic_cast&>( + lf_indexers_[label].get_keys())); + } else { + LOG(ERROR) << "Unsupported vertex id type: " + << lf_indexers_[label].get_type(); + return nullptr; + } +} + } // namespace gs diff --git a/flex/storages/rt_mutable_graph/mutable_property_fragment.h b/flex/storages/rt_mutable_graph/mutable_property_fragment.h index d8bccbe55c85..39fdc9a9f285 100644 --- a/flex/storages/rt_mutable_graph/mutable_property_fragment.h +++ b/flex/storages/rt_mutable_graph/mutable_property_fragment.h @@ -112,6 +112,11 @@ class MutablePropertyFragment { void loadSchema(const std::string& filename); + std::shared_ptr get_vertex_property_column( + uint8_t label, const std::string& prop) const; + + std::shared_ptr get_vertex_id_column(uint8_t label) const; + Schema schema_; std::vector lf_indexers_; std::vector ie_, oe_; diff --git a/interactive_engine/assembly/src/conf/groot/logback.xml b/interactive_engine/assembly/src/conf/groot/logback.xml index 93b56c971b09..4eb219fa22ee 100644 --- a/interactive_engine/assembly/src/conf/groot/logback.xml +++ b/interactive_engine/assembly/src/conf/groot/logback.xml @@ -34,6 +34,23 @@ + + ${log_dir}/perf_metric.log + + ${log_dir}/perf_metric.%d{yyyy-MM-dd}.%i.gz + 7 + 100MB + 500MB + + + [%d{ISO8601}][%p][%t][%c:%L] %m%n + + + + + + + diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java index 4ac59f7a7ad9..ab11c9b198b2 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java @@ -35,6 +35,8 @@ import com.alibaba.graphscope.common.ir.tools.QueryCache; import com.alibaba.graphscope.common.ir.tools.QueryIdGenerator; import com.alibaba.graphscope.common.manager.IrMetaQueryCallback; +import com.alibaba.graphscope.common.metric.MemoryMetric; +import com.alibaba.graphscope.common.metric.MetricsTool; import com.alibaba.graphscope.cypher.service.CypherBootstrapper; import com.alibaba.graphscope.gremlin.integration.result.GraphProperties; import com.alibaba.graphscope.gremlin.integration.result.TestGraphFactory; @@ -62,6 +64,7 @@ public class GraphServer { private final IrMetaQueryCallback metaQueryCallback; private final GraphProperties testGraph; private final GraphRelOptimizer optimizer; + private final MetricsTool metricsTool; private IrGremlinServer gremlinServer; private CypherBootstrapper cypherBootstrapper; @@ -77,10 +80,13 @@ public GraphServer( this.metaQueryCallback = metaQueryCallback; this.testGraph = testGraph; this.optimizer = optimizer; + this.metricsTool = new MetricsTool(configs); + this.metricsTool.registerMetric(new MemoryMetric()); } public void start() throws Exception { - ExecutionClient executionClient = ExecutionClient.Factory.create(configs, channelFetcher); + ExecutionClient executionClient = + ExecutionClient.Factory.create(configs, channelFetcher, metricsTool); QueryIdGenerator idGenerator = new QueryIdGenerator(configs); QueryCache queryCache = new QueryCache(configs); if (!FrontendConfig.GREMLIN_SERVER_DISABLED.get(configs)) { @@ -95,7 +101,8 @@ public void start() throws Exception { executionClient, channelFetcher, metaQueryCallback, - testGraph); + testGraph, + metricsTool); this.gremlinServer.start(); } if (!FrontendConfig.NEO4J_BOLT_SERVER_DISABLED.get(configs)) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java index 089726d7fe0f..9cf88effecaf 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java @@ -21,6 +21,7 @@ import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.QueryTimeoutConfig; +import com.alibaba.graphscope.common.metric.MetricsTool; import com.alibaba.graphscope.gremlin.plugin.QueryLogger; /** @@ -45,10 +46,11 @@ public abstract void submit( public abstract void close() throws Exception; public static class Factory { - public static ExecutionClient create(Configs configs, ChannelFetcher channelFetcher) { + public static ExecutionClient create( + Configs configs, ChannelFetcher channelFetcher, MetricsTool metricsTool) { switch (channelFetcher.getType()) { case RPC: - return new RpcExecutionClient(configs, channelFetcher); + return new RpcExecutionClient(configs, channelFetcher, metricsTool); case HTTP: return new HttpExecutionClient(configs, channelFetcher); default: diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java index 1ef68e17a1a0..ca84658a71d2 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java @@ -17,38 +17,41 @@ package com.alibaba.graphscope.common.client; import com.alibaba.graphscope.common.client.channel.ChannelFetcher; +import com.alibaba.graphscope.common.client.metric.RpcExecutorMetric; import com.alibaba.graphscope.common.client.type.ExecutionRequest; import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.PegasusConfig; import com.alibaba.graphscope.common.config.QueryTimeoutConfig; +import com.alibaba.graphscope.common.metric.MetricsTool; import com.alibaba.graphscope.gaia.proto.IrResult; import com.alibaba.graphscope.gremlin.plugin.QueryLogger; import com.alibaba.pegasus.RpcChannel; import com.alibaba.pegasus.RpcClient; import com.alibaba.pegasus.intf.ResultProcessor; import com.alibaba.pegasus.service.protocol.PegasusClient; +import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; +import io.grpc.ClientInterceptors; import io.grpc.Status; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicReference; +import java.util.List; +import java.util.stream.Collectors; /** * rpc client to send request to pegasus engine service */ public class RpcExecutionClient extends ExecutionClient { - Logger logger = LoggerFactory.getLogger(RpcExecutionClient.class); private final Configs graphConfig; - private final AtomicReference rpcClientRef; - public RpcExecutionClient(Configs graphConfig, ChannelFetcher channelFetcher) { + public RpcExecutionClient( + Configs graphConfig, + ChannelFetcher channelFetcher, + MetricsTool metricsTool) { super(channelFetcher); this.graphConfig = graphConfig; - this.rpcClientRef = new AtomicReference<>(); + metricsTool.registerMetric(new RpcExecutorMetric(channelFetcher)); } @Override @@ -58,10 +61,18 @@ public void submit( QueryTimeoutConfig timeoutConfig, QueryLogger queryLogger) throws Exception { - if (rpcClientRef.get() == null) { - rpcClientRef.compareAndSet(null, new RpcClient(channelFetcher.fetch())); - } - RpcClient rpcClient = rpcClientRef.get(); + List interceptChannels = + channelFetcher.fetch().stream() + .map( + k -> + new RpcChannel( + ClientInterceptors.intercept( + k.getChannel(), new RpcInterceptor()))) + .collect(Collectors.toList()); + RpcClient rpcClient = + new RpcClient( + interceptChannels, + ImmutableMap.of(RpcInterceptor.QUERY_LOGGER_OPTION, queryLogger)); PegasusClient.JobRequest jobRequest = PegasusClient.JobRequest.newBuilder() .setPlan( @@ -99,7 +110,8 @@ public void process(PegasusClient.JobResponse jobResponse) { @Override public void finish() { listener.onCompleted(); - queryLogger.info("[compile]: received results from engine"); + queryLogger.info( + "[query][response]: received all responses from all servers"); } @Override @@ -113,8 +125,17 @@ public void error(Status status) { @Override public void close() throws Exception { - if (rpcClientRef.get() != null) { - rpcClientRef.get().shutdown(); - } + channelFetcher + .fetch() + .forEach( + k -> { + try { + if (k != null) { + k.shutdown(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcInterceptor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcInterceptor.java new file mode 100644 index 000000000000..647a16a83d23 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcInterceptor.java @@ -0,0 +1,92 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.client; + +import com.alibaba.graphscope.gremlin.plugin.QueryLogger; + +import io.grpc.*; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicBoolean; + +public class RpcInterceptor implements ClientInterceptor { + public static final CallOptions.Key QUERY_LOGGER_OPTION = + CallOptions.Key.create("query-logger"); + + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions, + Channel channel) { + return new ForwardingClientCall.SimpleForwardingClientCall( + channel.newCall(methodDescriptor, callOptions)) { + private Instant requestStartTime; + + @Override + public void start(Listener responseListener, Metadata headers) { + requestStartTime = Instant.now(); + QueryLogger queryLogger = callOptions.getOption(QUERY_LOGGER_OPTION); + super.start( + new ForwardingClientCallListener.SimpleForwardingClientCallListener( + responseListener) { + private final AtomicBoolean firstResponseLogged = + new AtomicBoolean(false); + + @Override + public void onMessage(RespT message) { + if (firstResponseLogged.compareAndSet(false, true)) { + long firstResponseTime = + Instant.now().toEpochMilli() + - requestStartTime.toEpochMilli(); + if (queryLogger != null) { + queryLogger.info( + "[query][response]: receive the first response from" + + " the channel {} in {} ms", + channel.authority(), + firstResponseTime); + } + } + super.onMessage(message); + } + + @Override + public void onClose(Status status, Metadata trailers) { + long endTime = Instant.now().toEpochMilli(); + long totalTime = endTime - requestStartTime.toEpochMilli(); + if (queryLogger != null) { + queryLogger.info( + "[query][response]: receive the last response from the" + + " channel {} with status {} in {} ms", + channel.authority(), + status, + totalTime); + } + super.onClose(status, trailers); + } + }, + headers); + if (queryLogger != null) { + queryLogger.info( + "[query][submitted]: submit the query to the task queue of channel {}", + channel.authority()); + } + } + }; + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java index 2e0830276b8f..f25495bc9528 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java @@ -18,9 +18,10 @@ import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.HiactorConfig; +import com.alibaba.graphscope.common.config.Utils; import java.net.URI; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -29,19 +30,18 @@ */ public class HostURIChannelFetcher implements ChannelFetcher { private static final String schema = "http"; - private Configs graphConfig; + private final List uriChannels; public HostURIChannelFetcher(Configs graphConfig) { - this.graphConfig = graphConfig; + this.uriChannels = + Utils.convertDotString(HiactorConfig.HIACTOR_HOSTS.get(graphConfig)).stream() + .map(k -> URI.create(schema + "://" + k)) + .collect(Collectors.toList()); } @Override public List fetch() { - String hosts = HiactorConfig.HIACTOR_HOSTS.get(graphConfig); - String[] hostsArr = hosts.split(","); - return Arrays.asList(hostsArr).stream() - .map(k -> URI.create(schema + "://" + k)) - .collect(Collectors.toList()); + return Collections.unmodifiableList(uriChannels); } @Override diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java index 63f5544c6857..39f980288ceb 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java @@ -21,30 +21,30 @@ import com.alibaba.graphscope.common.config.Utils; import com.alibaba.pegasus.RpcChannel; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; /** * rpc implementation of {@link ChannelFetcher}, init rpc from local config */ public class HostsRpcChannelFetcher implements ChannelFetcher { - private Configs config; + private final List rpcChannels; public HostsRpcChannelFetcher(Configs config) { - this.config = config; + this.rpcChannels = + Utils.convertDotString(PegasusConfig.PEGASUS_HOSTS.get(config)).stream() + .map( + k -> { + String[] host = k.split(":"); + return new RpcChannel(host[0], Integer.valueOf(host[1])); + }) + .collect(Collectors.toList()); } @Override public List fetch() { - List hostAddresses = - Utils.convertDotString(PegasusConfig.PEGASUS_HOSTS.get(config)); - List rpcChannels = new ArrayList<>(); - hostAddresses.forEach( - k -> { - String[] host = k.split(":"); - rpcChannels.add(new RpcChannel(host[0], Integer.valueOf(host[1]))); - }); - return rpcChannels; + return Collections.unmodifiableList(rpcChannels); } @Override diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/metric/RpcExecutorMetric.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/metric/RpcExecutorMetric.java new file mode 100644 index 000000000000..7c9beee321a8 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/metric/RpcExecutorMetric.java @@ -0,0 +1,66 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.client.metric; + +import com.alibaba.graphscope.common.client.channel.ChannelFetcher; +import com.alibaba.graphscope.common.metric.Metric; +import com.alibaba.pegasus.RpcChannel; +import com.google.common.collect.Maps; + +import io.grpc.ManagedChannel; +import io.grpc.internal.RpcUtils; +import io.netty.channel.Channel; +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.SingleThreadEventExecutor; + +import java.util.List; +import java.util.Map; + +public class RpcExecutorMetric implements Metric { + private final ChannelFetcher channelFetcher; + + public RpcExecutorMetric(ChannelFetcher channelFetcher) { + this.channelFetcher = channelFetcher; + } + + @Override + public Key getKey() { + return KeyFactory.RPC_CHANNELS_EXECUTOR_QUEUE; + } + + @Override + public Map getValue() { + List channels = channelFetcher.fetch(); + Map values = Maps.newHashMap(); + channels.forEach( + k -> { + ManagedChannel channel = RpcUtils.getDelegateChannel(k.getChannel()); + int queueSize = ValueFactory.INVALID_INT; + Channel nettyChannel = RpcUtils.getNettyChannel(channel); + if (nettyChannel != null) { + EventLoop eventLoop = nettyChannel.eventLoop(); + if (eventLoop instanceof SingleThreadEventExecutor) { + queueSize = ((SingleThreadEventExecutor) eventLoop).pendingTasks(); + } + } + values.put(k.getChannel().authority(), queueSize); + }); + return values; + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java index 6a66c31d6496..bbbbe19b7967 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java @@ -63,4 +63,7 @@ public class FrontendConfig { public static final Config QUERY_PRINT_THRESHOLD_MS = Config.longConfig("query.print.threshold.ms", 200l); + + public static final Config METRICS_TOOL_INTERVAL_MS = + Config.longConfig("metrics.tool.interval.ms", 5 * 60 * 1000L); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java index d9ca759e680c..71be66ef2d20 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java @@ -21,6 +21,7 @@ import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.GraphConfig; import com.alibaba.graphscope.common.config.PlannerConfig; +import com.alibaba.graphscope.common.ir.meta.GraphId; import com.alibaba.graphscope.common.ir.meta.IrMeta; import com.alibaba.graphscope.common.ir.meta.IrMetaStats; import com.alibaba.graphscope.common.ir.meta.IrMetaTracker; @@ -46,26 +47,27 @@ public class DynamicIrMetaFetcher extends IrMetaFetcher implements AutoCloseable private volatile IrMetaStats currentState; // To manage the state changes of statistics resulting from different update operations. private volatile StatsState statsState; - private final boolean fetchStats; + private volatile Boolean statsEnabled = null; public DynamicIrMetaFetcher(Configs configs, IrMetaReader dataReader, IrMetaTracker tracker) { super(dataReader, tracker); this.scheduler = new ScheduledThreadPoolExecutor(1); - this.scheduler.scheduleAtFixedRate( - () -> syncMeta(), - 0, - GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs), - TimeUnit.MILLISECONDS); - this.fetchStats = + long schemaIntervalMS = GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs); + if (schemaIntervalMS > 0) { + logger.info("start to schedule the schema sync task per {} ms", schemaIntervalMS); + this.scheduler.scheduleAtFixedRate( + () -> syncMeta(), schemaIntervalMS, schemaIntervalMS, TimeUnit.MILLISECONDS); + } + boolean isCBOMode = PlannerConfig.GRAPH_PLANNER_IS_ON.get(configs) - && PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equals("CBO"); - if (this.fetchStats) { - logger.info("start to schedule statistics fetch task"); + && PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equalsIgnoreCase("CBO"); + long statsIntervalMS = GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs); + if (!isCBOMode || statsIntervalMS <= 0) { + this.statsEnabled = false; + } else { + logger.info("start to schedule the stats sync task per {} ms", statsIntervalMS); this.scheduler.scheduleAtFixedRate( - () -> syncStats(), - 2000, - GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs), - TimeUnit.MILLISECONDS); + () -> syncStats(), statsIntervalMS, statsIntervalMS, TimeUnit.MILLISECONDS); } } @@ -80,7 +82,6 @@ private synchronized void syncMeta() { logger.debug( "schema from remote: {}", (meta == null) ? null : meta.getSchema().getSchemaSpec(Type.IR_CORE_IN_JSON)); - GraphStatistics curStats; // if the graph id or schema version is changed, we need to update the statistics if (this.currentState == null || !this.currentState.getGraphId().equals(meta.getGraphId()) @@ -89,58 +90,74 @@ private synchronized void syncMeta() { .getVersion() .equals(meta.getSchema().getVersion())) { this.statsState = StatsState.INITIALIZED; - curStats = null; - } else { - curStats = this.currentState.getStatistics(); + this.currentState = + new IrMetaStats( + meta.getGraphId(), + meta.getSnapshotId(), + meta.getSchema(), + meta.getStoredProcedures(), + null); } - this.currentState = - new IrMetaStats( - meta.getGraphId(), - meta.getSnapshotId(), - meta.getSchema(), - meta.getStoredProcedures(), - curStats); - if (this.fetchStats && this.statsState != StatsState.SYNCED) { - logger.info("start to schedule statistics fetch task"); + boolean statsEnabled = getStatsEnabled(this.currentState.getGraphId()); + if (statsEnabled && this.statsState != StatsState.SYNCED + || (!statsEnabled && this.statsState != StatsState.MOCKED)) { + logger.debug("start to sync stats"); syncStats(); } } catch (Throwable e) { - logger.warn("failed to read meta data", e); + logger.warn("failed to read meta data, error is {}", e); + } + } + + private boolean getStatsEnabled(GraphId graphId) { + try { + this.statsEnabled = + (this.statsEnabled == null) + ? this.reader.syncStatsEnabled(graphId) + : this.statsEnabled; + return this.statsEnabled; + } catch ( + Throwable e) { // if errors happen when reading stats enabled, we assume it is false + logger.warn("failed to read stats enabled, error is {}", e); + return false; } } private synchronized void syncStats() { try { if (this.currentState != null) { - GraphStatistics stats = this.reader.readStats(this.currentState.getGraphId()); - logger.debug("statistics from remote: {}", stats); - if (stats != null && stats.getVertexCount() != 0) { - this.currentState = - new IrMetaStats( - this.currentState.getSnapshotId(), - this.currentState.getSchema(), - this.currentState.getStoredProcedures(), - stats); - if (tracker != null) { - logger.debug("start to update the glogue"); - tracker.onChanged(this.currentState); + boolean statsEnabled = getStatsEnabled(this.currentState.getGraphId()); + if (statsEnabled) { + GraphStatistics stats = this.reader.readStats(this.currentState.getGraphId()); + logger.debug("statistics from remote: {}", stats); + if (stats != null && stats.getVertexCount() != 0) { + this.currentState = + new IrMetaStats( + this.currentState.getSnapshotId(), + this.currentState.getSchema(), + this.currentState.getStoredProcedures(), + stats); + if (tracker != null) { + logger.info("start to update the glogue"); + tracker.onChanged(this.currentState); + } + this.statsState = StatsState.SYNCED; } - this.statsState = StatsState.SYNCED; } } } catch (Throwable e) { - logger.warn("failed to read graph statistics, error is: " + e); + logger.warn("failed to read graph statistics, error is {}", e); } finally { try { if (this.currentState != null && tracker != null && this.statsState == StatsState.INITIALIZED) { - logger.debug("start to mock the glogue"); + logger.info("start to mock the glogue"); tracker.onChanged(this.currentState); this.statsState = StatsState.MOCKED; } } catch (Throwable t) { - logger.warn("failed to mock the glogue, error is", t); + logger.warn("failed to mock the glogue, error is {}", t); } } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/JoinDecompositionRule.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/JoinDecompositionRule.java index b49f2115deab..ea8003ac63b9 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/JoinDecompositionRule.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/JoinDecompositionRule.java @@ -49,10 +49,7 @@ public void onMatch(RelOptRuleCall relOptRuleCall) { // specific optimization for relational DB scenario. // 3. `JoinByEdge`: Split the pattern by edge, convert a triangle pattern to `JoinByEdge` to // support optimizations in Neo4j. - if (getMaxEdgeNum(graphPattern.getPattern()) > 2) { - (new JoinByVertex(graphPattern, mq, decompositionQueue, queueCapacity)) - .addDecompositions(); - } + (new JoinByVertex(graphPattern, mq, decompositionQueue, queueCapacity)).addDecompositions(); if (config.getForeignKeyMeta() != null) { (new JoinByForeignKey(graphPattern, mq, decompositionQueue, queueCapacity)) .addDecompositions(); @@ -311,10 +308,13 @@ public JoinByVertex( @Override public void addDecompositions() { - List queues = initDecompositions(); - while (!queues.isEmpty()) { - List nextCompositions = getDecompositions(queues.remove(0)); - queues.addAll(nextCompositions); + if (getMaxEdgeNum(graphPattern.getPattern()) > 2) { + List queues = initDecompositions(); + while (!queues.isEmpty()) { + List nextCompositions = + getDecompositions(queues.remove(0)); + queues.addAll(nextCompositions); + } } addPxdInnerVDecompositions(); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/volcano/VolcanoPlannerX.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/volcano/VolcanoPlannerX.java index 4e468b60aa58..73df191c473a 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/volcano/VolcanoPlannerX.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/volcano/VolcanoPlannerX.java @@ -22,6 +22,7 @@ import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.ConventionTraitDef; import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.plan.volcano.VolcanoPlanner; import org.apache.calcite.rel.RelNode; @@ -59,4 +60,9 @@ protected RelOptCost upperBoundForInputs(RelNode mExpr, RelOptCost upperBound) { if (relCost == null) return null; return cost.plus(relCost); } + + @Override + public void registerSchema(RelOptSchema schema) { + // do nothing + } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java index b346cfe0d744..991ae7b89d5f 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java @@ -38,6 +38,7 @@ import com.alibaba.graphscope.common.ir.runtime.proto.GraphRelProtoPhysicalBuilder; import com.alibaba.graphscope.common.ir.type.GraphTypeFactoryImpl; import com.alibaba.graphscope.common.utils.ClassUtils; +import com.alibaba.graphscope.gremlin.plugin.QueryLogger; import com.alibaba.graphscope.proto.frontend.Code; import com.google.common.collect.Maps; @@ -83,6 +84,11 @@ public GraphPlanner( } public PlannerInstance instance(String query, IrMeta irMeta) { + return instance(query, irMeta, null); + } + + public PlannerInstance instance( + String query, IrMeta irMeta, @Nullable QueryLogger queryLogger) { GraphOptCluster optCluster = GraphOptCluster.create(this.optimizer.getMatchPlanner(), this.rexBuilder); RelMetadataQuery mq = @@ -99,7 +105,7 @@ public PlannerInstance instance(String query, IrMeta irMeta) { graphConfig, optCluster, new GraphOptSchema(optCluster, schema)); LogicalPlan logicalPlan = logicalPlanFactory.create(graphBuilder, irMeta, query); - return new PlannerInstance(query, logicalPlan, graphBuilder, irMeta); + return new PlannerInstance(query, logicalPlan, graphBuilder, irMeta, queryLogger); } public class PlannerInstance { @@ -107,13 +113,19 @@ public class PlannerInstance { private final LogicalPlan parsedPlan; private final GraphBuilder graphBuilder; private final IrMeta irMeta; + private @Nullable final QueryLogger queryLogger; public PlannerInstance( - String query, LogicalPlan parsedPlan, GraphBuilder graphBuilder, IrMeta irMeta) { + String query, + LogicalPlan parsedPlan, + GraphBuilder graphBuilder, + IrMeta irMeta, + @Nullable QueryLogger queryLogger) { this.query = query; this.parsedPlan = parsedPlan; this.graphBuilder = graphBuilder; this.irMeta = irMeta; + this.queryLogger = queryLogger; } public LogicalPlan getParsedPlan() { @@ -123,10 +135,16 @@ public LogicalPlan getParsedPlan() { public Summary plan() { LogicalPlan logicalPlan = ClassUtils.callException(() -> planLogical(), Code.LOGICAL_PLAN_BUILD_FAILED); - return new Summary( - logicalPlan, + if (queryLogger != null) { + queryLogger.info("[query][compiled]: logical IR compiled"); + } + PhysicalPlan physicalPlan = ClassUtils.callException( - () -> planPhysical(logicalPlan), Code.PHYSICAL_PLAN_BUILD_FAILED)); + () -> planPhysical(logicalPlan), Code.PHYSICAL_PLAN_BUILD_FAILED); + if (queryLogger != null) { + queryLogger.info("[query][compiled]: physical IR compiled"); + } + return new Summary(logicalPlan, physicalPlan); } public LogicalPlan planLogical() { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/type/GraphTypeFactoryImpl.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/type/GraphTypeFactoryImpl.java index 15f13bc03d6e..fdb6aa9e7e55 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/type/GraphTypeFactoryImpl.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/type/GraphTypeFactoryImpl.java @@ -86,10 +86,13 @@ public RelDataType createArbitraryMapType( @Override public @Nullable RelDataType leastRestrictive(List types) { if (types.stream().anyMatch(t -> t instanceof GraphLabelType)) { + // union all labels + List unionLabels = Lists.newArrayList(); for (RelDataType type : types) { if (!(type instanceof GraphLabelType)) return null; + unionLabels.addAll(((GraphLabelType) type).getLabelsEntry()); } - return types.get(0); + return new GraphLabelType(unionLabels.stream().distinct().collect(Collectors.toList())); } if (types.stream().anyMatch(t -> t instanceof ArbitraryMapType)) { return leastRestrictiveForArbitraryMapType(types); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/manager/RateLimitExecutor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/manager/RateLimitExecutor.java index 9d8312dd6d93..3dcad741ccb1 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/manager/RateLimitExecutor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/manager/RateLimitExecutor.java @@ -20,14 +20,12 @@ import com.alibaba.graphscope.common.config.FrontendConfig; import com.google.common.util.concurrent.RateLimiter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; public class RateLimitExecutor extends ThreadPoolExecutor { - private static final Logger logger = LoggerFactory.getLogger(RateLimitExecutor.class); private final RateLimiter rateLimiter; + private final AtomicLong queryCounter; public RateLimitExecutor( Configs configs, @@ -48,9 +46,15 @@ public RateLimitExecutor( handler); int permitsPerSecond = FrontendConfig.QUERY_PER_SECOND_LIMIT.get(configs); this.rateLimiter = RateLimiter.create(permitsPerSecond); + this.queryCounter = new AtomicLong(0); + } + + public long getQueryCounter() { + return queryCounter.get(); } public Future submit(Runnable task) { + incrementCounter(); if (rateLimiter.tryAcquire()) { return super.submit(task); } @@ -60,4 +64,15 @@ public Future submit(Runnable task) { + " per second. Please increase the QPS limit by the config" + " 'query.per.second.limit' or slow down the query sending speed"); } + + // lock-free + private void incrementCounter() { + while (true) { + long currentValue = queryCounter.get(); + long nextValue = (currentValue == Long.MAX_VALUE) ? 0 : currentValue + 1; + if (queryCounter.compareAndSet(currentValue, nextValue)) { + break; + } + } + } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/MemoryMetric.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/MemoryMetric.java new file mode 100644 index 000000000000..3b28117a0dfb --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/MemoryMetric.java @@ -0,0 +1,62 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.metric; + +import com.google.common.collect.ImmutableMap; + +import java.lang.management.BufferPoolMXBean; +import java.lang.management.ManagementFactory; +import java.util.List; +import java.util.Map; + +public class MemoryMetric implements Metric { + @Override + public Key getKey() { + return KeyFactory.MEMORY; + } + + @Override + public Map getValue() { + // jvm memory status + long jvmFreeMem = Runtime.getRuntime().freeMemory(); + long jvmTotalMem = Runtime.getRuntime().totalMemory(); + long jvmUsedMem = jvmTotalMem - jvmFreeMem; + + // Direct memory + List bufferPools = + ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); + long directUsedMem = ValueFactory.INVALID_LONG; + long directTotalMem = ValueFactory.INVALID_LONG; + for (BufferPoolMXBean bufferPool : bufferPools) { + if ("direct".equalsIgnoreCase(bufferPool.getName())) { + directUsedMem = bufferPool.getMemoryUsed(); + directTotalMem = bufferPool.getTotalCapacity(); + } + } + return ImmutableMap.of( + "jvm.used", + jvmUsedMem, + "jvm.total", + jvmTotalMem, + "direct.used", + directUsedMem, + "direct.total", + directTotalMem); + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/Metric.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/Metric.java new file mode 100644 index 000000000000..12f5b6b6dcef --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/Metric.java @@ -0,0 +1,52 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.metric; + +public interface Metric { + + class KeyFactory { + public static final Key MEMORY = new Key("memory.usage"); + public static final Key RPC_CHANNELS_EXECUTOR_QUEUE = + new Key("rpc.channels.executor.queue"); + public static final Key GREMLIN_EXECUTOR_QUEUE = new Key("gremlin.executor.queue"); + public static final Key GREMLIN_QPS = new Key("gremlin.qps"); + } + + class ValueFactory { + public static long INVALID_LONG = -1L; + public static int INVALID_INT = -1; + } + + Key getKey(); + + Value getValue(); + + class Key { + private final String keyName; + + private Key(String keyName) { + this.keyName = keyName; + } + + @Override + public String toString() { + return this.keyName; + } + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/MetricsTool.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/MetricsTool.java new file mode 100644 index 000000000000..99ab3b01efba --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/MetricsTool.java @@ -0,0 +1,70 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.metric; + +import com.alibaba.graphscope.common.config.Configs; +import com.alibaba.graphscope.common.config.FrontendConfig; +import com.google.common.collect.Lists; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class MetricsTool { + private static Logger logger = LoggerFactory.getLogger("PerfMetricLog"); + private final List metrics; + private final ScheduledExecutorService service; + private final long intervalMS; + + public MetricsTool(Configs configs) { + this.metrics = Lists.newArrayList(); + this.service = new ScheduledThreadPoolExecutor(1); + this.intervalMS = FrontendConfig.METRICS_TOOL_INTERVAL_MS.get(configs); + if (this.intervalMS > 0) { + this.service.scheduleAtFixedRate( + () -> printMetrics(), intervalMS, intervalMS, TimeUnit.MILLISECONDS); + } + } + + public MetricsTool registerMetric(Metric metric) { + if (metrics.stream().anyMatch(k -> k.getKey().equals(metric.getKey()))) { + logger.warn("metric {} already exists", metric.getKey()); + return this; + } + metrics.add(metric); + return this; + } + + private void printMetrics() { + try { + StringBuilder builder = new StringBuilder(); + metrics.forEach( + k -> { + builder.append(k.getKey()).append(":").append(k.getValue()).append("\n"); + }); + logger.info("print perf metrics per {} ms:\n{} \n\n", intervalMS, builder); + } catch (Throwable t) { + logger.error("print perf metrics failed", t); + } + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java index 1d6eeaaacc25..8db5c94c5109 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java @@ -121,13 +121,18 @@ public StatementResult run( null, graphConfig); try { + statusCallback + .getQueryLogger() + .info("[query][received]: query received from the cypher client"); // hack ways to execute routing table or ping statement before executing the real query if (statement.equals(GET_ROUTING_TABLE_STATEMENT) || statement.equals(PING_STATEMENT)) { return super.run(fabricTransaction, statement, parameters); } irMeta = metaQueryCallback.beforeExec(); QueryCache.Key cacheKey = - queryCache.createKey(graphPlanner.instance(statement, irMeta)); + queryCache.createKey( + graphPlanner.instance( + statement, irMeta, statusCallback.getQueryLogger())); QueryCache.Value cacheValue = queryCache.get(cacheKey); Preconditions.checkArgument( cacheValue != null, @@ -137,21 +142,15 @@ public StatementResult run( new GraphPlanner.Summary( cacheValue.summary.getLogicalPlan(), cacheValue.summary.getPhysicalPlan()); - logger.debug( - "cypher query \"{}\", job conf name \"{}\", calcite logical plan {}, hash id" - + " {}", - statement, - jobName, - planSummary.getLogicalPlan().explain(), - cacheKey.hashCode()); + statusCallback + .getQueryLogger() + .info("logical IR plan \n\n {} \n\n", planSummary.getLogicalPlan().explain()); + statusCallback + .getQueryLogger() + .debug("physical IR plan {}", planSummary.getPhysicalPlan().explain()); if (planSummary.getLogicalPlan().isReturnEmpty()) { return StatementResults.initial(); } - logger.info( - "cypher query \"{}\", job conf name \"{}\", ir core logical plan {}", - statement, - jobName, - planSummary.getPhysicalPlan().explain()); QueryTimeoutConfig timeoutConfig = getQueryTimeoutConfig(); GraphPlanExecutor executor; if (cacheValue.result != null && cacheValue.result.isCompleted) { @@ -190,6 +189,9 @@ public void execute( listener, timeoutConfig, statusCallback.getQueryLogger()); + statusCallback + .getQueryLogger() + .info("[query][submitted]: physical IR submitted"); } }; } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java index b39e2d8eadc6..33256c5959d5 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java @@ -110,6 +110,7 @@ public void request(long l) throws Exception { } if (!recordIterator.hasNext()) { subscriber.onResultCompleted(QueryStatistics.EMPTY); + statusCallback.onSuccessEnd(); } } @@ -136,7 +137,6 @@ public void onNext(IrResult.Record record) { public void onCompleted() { try { this.recordIterator.finish(); - this.statusCallback.onSuccessEnd(); } catch (InterruptedException e) { onError(e); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/metric/GremlinExecutorQueueMetric.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/metric/GremlinExecutorQueueMetric.java new file mode 100644 index 000000000000..6debae48a772 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/metric/GremlinExecutorQueueMetric.java @@ -0,0 +1,48 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.gremlin.metric; + +import com.alibaba.graphscope.common.metric.Metric; +import com.alibaba.graphscope.gremlin.Utils; + +import org.apache.tinkerpop.gremlin.server.GremlinServer; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +public class GremlinExecutorQueueMetric implements Metric { + private final ExecutorService executorService; + + public GremlinExecutorQueueMetric(GremlinServer server) { + this.executorService = + Utils.getFieldValue(GremlinServer.class, server, "gremlinExecutorService"); + } + + @Override + public Key getKey() { + return KeyFactory.GREMLIN_EXECUTOR_QUEUE; + } + + @Override + public Integer getValue() { + return (executorService instanceof ThreadPoolExecutor) + ? ((ThreadPoolExecutor) executorService).getQueue().size() + : ValueFactory.INVALID_INT; + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/metric/GremlinQPSMetric.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/metric/GremlinQPSMetric.java new file mode 100644 index 000000000000..9919586f22bd --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/metric/GremlinQPSMetric.java @@ -0,0 +1,62 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.gremlin.metric; + +import com.alibaba.graphscope.common.manager.RateLimitExecutor; +import com.alibaba.graphscope.common.metric.Metric; +import com.alibaba.graphscope.gremlin.Utils; + +import org.apache.commons.lang3.time.StopWatch; +import org.apache.tinkerpop.gremlin.server.GremlinServer; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public class GremlinQPSMetric implements Metric { + private final ExecutorService executorService; + + public GremlinQPSMetric(GremlinServer server) { + this.executorService = + Utils.getFieldValue(GremlinServer.class, server, "gremlinExecutorService"); + } + + @Override + public Key getKey() { + return KeyFactory.GREMLIN_QPS; + } + + @Override + public Long getValue() { + try { + if (executorService instanceof RateLimitExecutor) { + long startCounter = ((RateLimitExecutor) executorService).getQueryCounter(); + StopWatch watch = StopWatch.createStarted(); + Thread.sleep(2000); + long endCounter = ((RateLimitExecutor) executorService).getQueryCounter(); + long elapsed = watch.getTime(TimeUnit.MILLISECONDS); + // the counter may be reset to 0, so we need to handle this case + startCounter = (endCounter >= startCounter) ? startCounter : 0; + return (endCounter - startCounter) * 1000 / elapsed; + } + return ValueFactory.INVALID_LONG; + } catch (InterruptedException t) { + throw new RuntimeException(t); + } + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java index 8ac1f418e99a..64d780a507b3 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java @@ -200,7 +200,9 @@ protected void evalOpInternal( new MetricsCollector.Gremlin(evalOpTimer), queryHistogram, configs); - statusCallback.getQueryLogger().info("[compile]: query received"); + statusCallback + .getQueryLogger() + .info("[query][received]: query received from the gremlin client"); QueryTimeoutConfig timeoutConfig = new QueryTimeoutConfig(ctx.getRequestTimeout()); GremlinExecutor.LifeCycle lifeCycle; switch (language) { @@ -361,7 +363,9 @@ protected GremlinExecutor.LifeCycle createLifeCycle( if (o != null && o instanceof Traversal) { applyStrategies((Traversal) o); } - statusCallback.getQueryLogger().info("[compile]: traversal compiled"); + statusCallback + .getQueryLogger() + .info("[query][compiled]: traversal compiled"); return o; }) .withResult( @@ -406,7 +410,7 @@ protected void processTraversal( return opCollection; }, Code.LOGICAL_PLAN_BUILD_FAILED); - queryLogger.info("[compile]: logical IR compiled"); + queryLogger.info("[query][compiled]: logical IR compiled"); StringBuilder irPlanStr = new StringBuilder(); PegasusClient.JobRequest physicalRequest = ClassUtils.callException( @@ -451,7 +455,7 @@ protected void processTraversal( return request; }, Code.PHYSICAL_PLAN_BUILD_FAILED); - queryLogger.info("[compile]: physical IR compiled"); + queryLogger.info("[query][compiled]: physical IR compiled"); Span outgoing; // if exist up trace, useUpTraceId as current traceId if (TraceId.isValid(queryLogger.getUpstreamId())) { @@ -478,6 +482,7 @@ protected void processTraversal( outgoing.setAttribute("query.plan", irPlanStr.toString()); this.rpcClient.submit( physicalRequest, resultProcessor, timeoutConfig.getChannelTimeoutMS()); + queryLogger.info("[query][submitted]: physical IR submitted"); // request results from remote engine service in blocking way resultProcessor.request(); } catch (Throwable t) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java index 1289c9caaef9..27d1228d3982 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java @@ -81,6 +81,7 @@ public GremlinExecutor.LifeCycle get() { b.put("graph.query.cache", queryCache); b.put("graph.planner", graphPlanner); b.put("graph.meta", meta); + b.put("graph.query.logger", statusCallback.getQueryLogger()); }) .withResult( o -> { @@ -95,7 +96,14 @@ public GremlinExecutor.LifeCycle get() { GraphPlanner.Summary summary = value.summary; statusCallback .getQueryLogger() - .debug("ir plan {}", summary.getPhysicalPlan().explain()); + .info( + "logical IR plan \n\n {} \n\n", + summary.getLogicalPlan().explain()); + statusCallback + .getQueryLogger() + .debug( + "physical IR plan {}", + summary.getPhysicalPlan().explain()); ResultSchema resultSchema = new ResultSchema(summary.getLogicalPlan()); GremlinResultProcessor listener = @@ -120,6 +128,9 @@ public GremlinExecutor.LifeCycle get() { listener, timeoutConfig, statusCallback.getQueryLogger()); + statusCallback + .getQueryLogger() + .info("[query][submitted]: physical IR submitted"); } // request results from remote engine in a blocking way listener.request(); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/script/GremlinCalciteScriptEngineFactory.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/script/GremlinCalciteScriptEngineFactory.java index 6398e88340b3..010f73837544 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/script/GremlinCalciteScriptEngineFactory.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/script/GremlinCalciteScriptEngineFactory.java @@ -20,6 +20,7 @@ import com.alibaba.graphscope.common.ir.meta.IrMeta; import com.alibaba.graphscope.common.ir.tools.GraphPlanner; import com.alibaba.graphscope.common.ir.tools.QueryCache; +import com.alibaba.graphscope.gremlin.plugin.QueryLogger; import org.apache.tinkerpop.gremlin.jsr223.AbstractGremlinScriptEngineFactory; import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngine; @@ -72,8 +73,9 @@ public Object eval(String script, ScriptContext ctx) throws ScriptException { QueryCache queryCache = (QueryCache) globalBindings.get("graph.query.cache"); GraphPlanner graphPlanner = (GraphPlanner) globalBindings.get("graph.planner"); IrMeta irMeta = (IrMeta) globalBindings.get("graph.meta"); + QueryLogger queryLogger = (QueryLogger) globalBindings.get("graph.query.logger"); QueryCache.Key cacheKey = - queryCache.createKey(graphPlanner.instance(script, irMeta)); + queryCache.createKey(graphPlanner.instance(script, irMeta, queryLogger)); return queryCache.get(cacheKey); } catch (FrontendException e) { throw e; diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java index b75f83d89166..12f2dbb0584e 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java @@ -108,7 +108,9 @@ public void request() { responseProcessor.process(responseStreamIterator.next()); } responseProcessor.finish(); - statusCallback.getQueryLogger().info("[compile]: process results success"); + statusCallback + .getQueryLogger() + .info("[query][response]: processed and sent all responses to the client"); } catch (Throwable t) { // if the exception is caused by InterruptedException, it means a timeout exception has // been thrown by gremlin executor diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java index 0f6524cab81c..0339d27f940d 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java @@ -92,6 +92,9 @@ public void request() { processRecord(recordStreamIterator.next()); } finishRecord(); + statusCallback + .getQueryLogger() + .info("[query][response]: processed and sent all responses to the client"); } catch (Throwable t) { // if the exception is caused by InterruptedException, it means a timeout exception has // been thrown by gremlin executor diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java index 8e3659e16f5c..f563c17e83a6 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java @@ -25,12 +25,15 @@ import com.alibaba.graphscope.common.ir.tools.QueryIdGenerator; import com.alibaba.graphscope.common.manager.IrMetaQueryCallback; import com.alibaba.graphscope.common.manager.RateLimitExecutor; +import com.alibaba.graphscope.common.metric.MetricsTool; import com.alibaba.graphscope.gremlin.Utils; import com.alibaba.graphscope.gremlin.auth.AuthManager; import com.alibaba.graphscope.gremlin.auth.AuthManagerReference; import com.alibaba.graphscope.gremlin.auth.DefaultAuthManager; import com.alibaba.graphscope.gremlin.integration.processor.IrTestOpProcessor; import com.alibaba.graphscope.gremlin.integration.result.GraphProperties; +import com.alibaba.graphscope.gremlin.metric.GremlinExecutorQueueMetric; +import com.alibaba.graphscope.gremlin.metric.GremlinQPSMetric; import com.alibaba.graphscope.gremlin.plugin.processor.IrOpLoader; import com.alibaba.graphscope.gremlin.plugin.processor.IrStandardOpProcessor; import com.alibaba.graphscope.gremlin.plugin.traversal.IrCustomizedTraversalSource; @@ -62,6 +65,7 @@ public class IrGremlinServer implements AutoCloseable { private final GraphTraversalSource g; private final QueryIdGenerator idGenerator; + private final MetricsTool metricsTool; public IrGremlinServer( Configs configs, @@ -71,7 +75,8 @@ public IrGremlinServer( ExecutionClient executionClient, ChannelFetcher channelFetcher, IrMetaQueryCallback metaQueryCallback, - GraphProperties testGraph) { + GraphProperties testGraph, + MetricsTool metricsTool) { this.configs = configs; this.idGenerator = idGenerator; this.queryCache = queryCache; @@ -91,6 +96,7 @@ public IrGremlinServer( this.settings.evaluationTimeout = FrontendConfig.QUERY_EXECUTION_TIMEOUT_MS.get(configs); this.graph = TinkerFactory.createModern(); this.g = this.graph.traversal(IrCustomizedTraversalSource.class); + this.metricsTool = metricsTool; } public void start() throws Exception { @@ -131,6 +137,9 @@ public void start() throws Exception { serverGremlinExecutor.getGraphManager().putTraversalSource("g", graph.traversal()); this.gremlinServer.start().join(); + this.metricsTool + .registerMetric(new GremlinExecutorQueueMetric(this.gremlinServer)) + .registerMetric(new GremlinQPSMetric(this.gremlinServer)); } private ExecutorService createRateLimitExecutor() { diff --git a/interactive_engine/compiler/src/main/java/io/grpc/internal/RpcUtils.java b/interactive_engine/compiler/src/main/java/io/grpc/internal/RpcUtils.java new file mode 100644 index 000000000000..f18e44225118 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/io/grpc/internal/RpcUtils.java @@ -0,0 +1,56 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package io.grpc.internal; + +import com.alibaba.graphscope.gremlin.Utils; + +import io.grpc.Channel; +import io.grpc.ManagedChannel; +import io.grpc.netty.NettyUtils; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.Set; + +public class RpcUtils { + public static @Nullable ManagedChannel getDelegateChannel(Channel channel) { + if (channel instanceof ForwardingManagedChannel) { + ManagedChannel delegate = + Utils.getFieldValue(ForwardingManagedChannel.class, channel, "delegate"); + return getDelegateChannel(delegate); + } + return (channel instanceof ManagedChannel) ? (ManagedChannel) channel : null; + } + + public static io.netty.channel.Channel getNettyChannel(ManagedChannel grpcChannel) { + if (grpcChannel instanceof ManagedChannelImpl) { + ManagedChannelImpl channelImpl = (ManagedChannelImpl) grpcChannel; + Set subChannels = + Utils.getFieldValue(ManagedChannelImpl.class, channelImpl, "subchannels"); + if (subChannels != null && !subChannels.isEmpty()) { + ClientTransport transport = subChannels.iterator().next().getTransport(); + while (transport instanceof ForwardingConnectionClientTransport) { + transport = ((ForwardingConnectionClientTransport) transport).delegate(); + } + return NettyUtils.getNettyChannel(transport); + } + } + return null; + } +} diff --git a/interactive_engine/compiler/src/main/java/io/grpc/netty/NettyUtils.java b/interactive_engine/compiler/src/main/java/io/grpc/netty/NettyUtils.java new file mode 100644 index 000000000000..55a70a8be074 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/io/grpc/netty/NettyUtils.java @@ -0,0 +1,37 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package io.grpc.netty; + +import com.alibaba.graphscope.gremlin.Utils; + +import io.grpc.internal.ClientTransport; +import io.netty.channel.Channel; + +import org.checkerframework.checker.nullness.qual.Nullable; + +public class NettyUtils { + public static @Nullable Channel getNettyChannel(ClientTransport transport) { + if (transport instanceof NettyClientTransport) { + NettyClientTransport nettyClientTransport = (NettyClientTransport) transport; + // Access the Netty Channel from NettyClientTransport + return Utils.getFieldValue(NettyClientTransport.class, nettyClientTransport, "channel"); + } + return null; + } +} diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/gremlin/antlr4x/GraphBuilderTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/gremlin/antlr4x/GraphBuilderTest.java index 9141dff65570..97279d78a2da 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/gremlin/antlr4x/GraphBuilderTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/gremlin/antlr4x/GraphBuilderTest.java @@ -28,6 +28,8 @@ import com.alibaba.graphscope.common.ir.tools.GraphStdOperatorTable; import com.alibaba.graphscope.common.ir.tools.config.GraphOpt; import com.alibaba.graphscope.common.ir.tools.config.SourceConfig; +import com.alibaba.graphscope.common.ir.type.ArbitraryMapType; +import com.alibaba.graphscope.common.ir.type.GraphLabelType; import com.alibaba.graphscope.common.ir.type.GraphProperty; import com.alibaba.graphscope.common.utils.FileUtils; import com.alibaba.graphscope.gaia.proto.OuterExpression; @@ -40,6 +42,7 @@ import org.antlr.v4.runtime.tree.ParseTree; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; import org.junit.Assert; @@ -86,6 +89,36 @@ public static RelNode eval(String query, GraphBuilder builder) { return visitor.visit(parseTree).build(); } + @Test + public void g_V_elementMap_test() { + GraphRelOptimizer optimizer = new GraphRelOptimizer(configs); + IrMeta irMeta = + Utils.mockIrMeta( + "schema/ldbc.json", + "statistics/ldbc30_statistics.json", + optimizer.getGlogueHolder()); + GraphBuilder builder = Utils.mockGraphBuilder(optimizer, irMeta); + RelNode node = + eval( + "g.V(72057594037928268).as(\"a\").outE(\"KNOWS\").as(\"b\").inV().as(\"c\").select('a'," + + " \"b\").by(elementMap())", + builder); + RelDataType projectType = node.getRowType().getFieldList().get(0).getType(); + RelDataType bValueType = projectType.getValueType(); + Assert.assertTrue(bValueType instanceof ArbitraryMapType); + GraphLabelType labelType = + (GraphLabelType) + ((ArbitraryMapType) bValueType) + .getKeyValueTypeMap().values().stream() + .filter(k -> k.getValue() instanceof GraphLabelType) + .findFirst() + .get() + .getValue(); + // make sure the inferred type contains the label type + Assert.assertTrue( + labelType.getLabelsEntry().stream().anyMatch(k -> k.getLabel().equals("KNOWS"))); + } + @Test public void g_V_test() { RelNode node = eval("g.V()"); diff --git a/interactive_engine/compiler/src/test/resources/logback-test.xml b/interactive_engine/compiler/src/test/resources/logback-test.xml new file mode 100644 index 000000000000..99b422fbc449 --- /dev/null +++ b/interactive_engine/compiler/src/test/resources/logback-test.xml @@ -0,0 +1,29 @@ + + + + +       +          [%d{ISO8601}][%p][%t][%c:%L] %m%n +       +     + + + + + diff --git a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcChannel.java b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcChannel.java index 29cacd8232cc..1b7502e4ff49 100644 --- a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcChannel.java +++ b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcChannel.java @@ -15,6 +15,7 @@ */ package com.alibaba.pegasus; +import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.opentelemetry.api.OpenTelemetry; @@ -28,9 +29,9 @@ public class RpcChannel { private static final Logger logger = LoggerFactory.getLogger(RpcChannel.class); - private final ManagedChannel channel; + private final Channel channel; - public RpcChannel(ManagedChannel channel) { + public RpcChannel(Channel channel) { this.channel = channel; } @@ -47,12 +48,17 @@ public RpcChannel(String host, int port, OpenTelemetry openTelemetry) { .build(); } - public ManagedChannel getChannel() { + public Channel getChannel() { return channel; } public void shutdown() throws InterruptedException { - this.channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + if (this.channel instanceof ManagedChannel) { + String name = channel.authority(); + ManagedChannel managedChannel = (ManagedChannel) this.channel; + managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + logger.info("rpc channel {} shutdown successfully", name); + } } public String toString() { diff --git a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java index 62939eac6953..e4062d19b03d 100644 --- a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java +++ b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java @@ -22,6 +22,7 @@ import com.alibaba.pegasus.service.protocol.PegasusClient.JobRequest; import com.alibaba.pegasus.service.protocol.PegasusClient.JobResponse; +import io.grpc.CallOptions; import io.grpc.Status; import io.grpc.stub.StreamObserver; import io.opentelemetry.api.trace.Span; @@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,6 +51,22 @@ public RpcClient(List channels) { .collect(Collectors.toList()); } + public RpcClient(List channels, Map options) { + this.channels = Objects.requireNonNull(channels); + this.serviceStubs = + channels.stream() + .map( + k -> { + JobServiceStub stub = JobServiceGrpc.newStub(k.getChannel()); + for (Map.Entry entry : + options.entrySet()) { + stub = stub.withOption(entry.getKey(), entry.getValue()); + } + return stub; + }) + .collect(Collectors.toList()); + } + public void submit(JobRequest jobRequest, ResultProcessor processor, long rpcTimeoutMS) { AtomicInteger counter = new AtomicInteger(this.channels.size()); AtomicBoolean finished = new AtomicBoolean(false); diff --git a/interactive_engine/executor/engine/pegasus/server/src/rpc.rs b/interactive_engine/executor/engine/pegasus/server/src/rpc.rs index 33f9d0b1688c..85cffdb74ed0 100644 --- a/interactive_engine/executor/engine/pegasus/server/src/rpc.rs +++ b/interactive_engine/executor/engine/pegasus/server/src/rpc.rs @@ -82,8 +82,10 @@ impl FromStream> for RpcSink { fn on_next(&mut self, resp: Vec) -> FnResult<()> { // todo: use bytes to alleviate copy & allocate cost; let res = pb::JobResponse { job_id: self.job_id, resp }; - self.tx.send(Ok(res)).ok(); - Ok(()) + debug!("rpc send response for job {}", self.job_id); + self.tx + .send(Ok(res)) + .map_err(|e| Box::new(e) as Box) } } @@ -115,7 +117,11 @@ impl FromStreamExt> for RpcSink { Status::unknown(format!("{:?}", server_error)) }; - self.tx.send(Err(status)).ok(); + if let Err(e) = self.tx.send(Err(status)) { + error!("rpc send error failure for job {}: {:?}", self.job_id, e); + } else { + info!("rpc send error success for job {}", self.job_id); + } } } @@ -124,8 +130,14 @@ impl Drop for RpcSink { let before_sub = self.peers.fetch_sub(1, Ordering::SeqCst); if before_sub == 1 { if !self.had_error.load(Ordering::SeqCst) { - self.tx.send(Err(Status::ok("ok"))).ok(); + if let Err(e) = self.tx.send(Err(Status::ok("ok"))) { + error!("rpc send complete failure for job {}: {:?}", self.job_id, e); + } else { + info!("rpc send complete success for job {}", self.job_id); + } } + } else { + debug!("rpc send success for job {}, {} left;", self.job_id, before_sub - 1); } } } diff --git a/k8s/dockerfiles/coordinator.Dockerfile b/k8s/dockerfiles/coordinator.Dockerfile index d55086472146..d12cbe99c043 100644 --- a/k8s/dockerfiles/coordinator.Dockerfile +++ b/k8s/dockerfiles/coordinator.Dockerfile @@ -36,7 +36,7 @@ FROM ubuntu:22.04 AS coordinator ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update -y && \ - apt-get install -y sudo python3-pip openmpi-bin curl tzdata && \ + apt-get install -y sudo python3-pip openmpi-bin curl tzdata netcat && \ apt-get clean -y && \ rm -rf /var/lib/apt/lists/* diff --git a/k8s/dockerfiles/graphscope-dev-wheel.Dockerfile b/k8s/dockerfiles/graphscope-dev-wheel.Dockerfile index 4383c3a1bc0b..69e6dded5a1d 100644 --- a/k8s/dockerfiles/graphscope-dev-wheel.Dockerfile +++ b/k8s/dockerfiles/graphscope-dev-wheel.Dockerfile @@ -51,6 +51,7 @@ RUN yum install -y sudo vim && \ yum remove java-1.8.0-openjdk-devel java-1.8.0-openjdk java-1.8.0-openjdk-headless -y && \ yum install java-11-openjdk-devel -y && \ yum clean all -y --enablerepo='*' && \ + yum install -y netcat && \ rm -rf /var/cache/yum RUN mkdir -p /opt/graphscope /opt/vineyard && chown -R graphscope:graphscope /opt/graphscope /opt/vineyard diff --git a/k8s/dockerfiles/graphscope-store.Dockerfile b/k8s/dockerfiles/graphscope-store.Dockerfile index 1d8372eac547..c079b2aac43b 100644 --- a/k8s/dockerfiles/graphscope-store.Dockerfile +++ b/k8s/dockerfiles/graphscope-store.Dockerfile @@ -14,6 +14,7 @@ COPY --chown=graphscope:graphscope . /home/graphscope/graphscope COPY --chown=graphscope:graphscope ./interactive_engine/assembly/src/conf/maven.settings.xml /home/graphscope/.m2/settings.xml USER graphscope +RUN rustup toolchain install 1.76.0 && rustup default 1.76.0 RUN cd /home/graphscope/graphscope \ && . ~/.graphscope_env \ diff --git a/k8s/dockerfiles/interactive-entrypoint.sh b/k8s/dockerfiles/interactive-entrypoint.sh index 74eb149b3118..009323f6c31f 100644 --- a/k8s/dockerfiles/interactive-entrypoint.sh +++ b/k8s/dockerfiles/interactive-entrypoint.sh @@ -32,6 +32,9 @@ function usage() { -c, --enable-coordinator: Launch the Interactive service along with Coordinator. Must enable this option if you want to use `gsctl` command-line tool. + -p, --port-mapping: Specify the port mapping for the interactive. + The format is container_port:host_port, multiple mappings are + separated by comma. For example, 8080:8081,7777:7778 EOF } @@ -90,6 +93,16 @@ function launch_service() { } function launch_coordinator() { + local host_ports=() + local container_ports=() + if [ -n "$1" ]; then + IFS=',' read -ra port_mappings <<< "$1" + for port_mapping in "${port_mappings[@]}"; do + IFS=':' read -ra ports <<< "$port_mapping" + container_ports+=(${ports[0]}) + host_ports+=(${ports[1]}) + done + fi if $ENABLE_COORDINATOR; then coordinator_config_file="/tmp/coordinator-config.yaml" @@ -102,6 +115,19 @@ launcher_type: hosts session: instance_id: demo EOF + + if [ ${#host_ports[@]} -gt 0 ]; then + echo "interactive:" >> $coordinator_config_file + echo " port_mapping:" >> $coordinator_config_file + for i in "${!host_ports[@]}"; do + echo " ${container_ports[$i]}: ${host_ports[$i]}" >> $coordinator_config_file + done + fi + # i.e + # interactive: + # port_mapping: + # 8080: 8081 + # 7777: 7778 python3 -m gscoordinator --config-file $coordinator_config_file fi } @@ -126,6 +152,15 @@ while [[ $# -gt 0 ]]; do ENABLE_COORDINATOR=true shift ;; + -p | --port-mapping) + shift + if [[ $# -eq 0 || $1 == -* ]]; then + echo "Option -p requires an argument." >&2 + exit 1 + fi + PORT_MAPPING=$1 + shift + ;; -h | --help) usage exit 0 @@ -141,4 +176,4 @@ done prepare_workspace $WORKSPACE launch_service $WORKSPACE -launch_coordinator +launch_coordinator $PORT_MAPPING diff --git a/k8s/dockerfiles/interactive.Dockerfile b/k8s/dockerfiles/interactive.Dockerfile index 4d1881e84ef9..4a818174e5f3 100644 --- a/k8s/dockerfiles/interactive.Dockerfile +++ b/k8s/dockerfiles/interactive.Dockerfile @@ -18,6 +18,7 @@ RUN cd /home/graphscope/GraphScope/ && \ else \ mkdir /home/graphscope/install; \ . /home/graphscope/.graphscope_env; \ + rustup toolchain install 1.76.0 && rustup default 1.76.0; \ make interactive-install BUILD_TYPE="$profile" INSTALL_PREFIX=/home/graphscope/install; \ fi diff --git a/k8s/internal/Makefile b/k8s/internal/Makefile index cd7fb8dbbe8b..81736ddf4b52 100644 --- a/k8s/internal/Makefile +++ b/k8s/internal/Makefile @@ -110,6 +110,7 @@ graphscope-manylinux2014-py3-nodocker: sudo sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-* && \ sudo yum install java-11-openjdk-devel -y && \ sudo yum remove java-1.8.0-openjdk-devel java-1.8.0-openjdk java-1.8.0-openjdk-headless -y && \ + rustup toolchain install 1.76.0 && rustup default 1.76.0 && \ cd $(WORKING_DIR)/../.. && \ if [[ "${PLATFORM}" == "aarch64" ]]; then \ export AUDITWHEEL_PLAT=manylinux2014_${PLATFORM}; \ diff --git a/python/graphscope/client/rpc.py b/python/graphscope/client/rpc.py index 58f5c131a5fa..6bfa8297aaad 100644 --- a/python/graphscope/client/rpc.py +++ b/python/graphscope/client/rpc.py @@ -41,19 +41,23 @@ class GRPCClient(object): def __init__(self, launcher, endpoint, reconnect=False): """Connect to GRAPE engine at the given :code:`endpoint`.""" # create the gRPC stub - options = [ + self._options = [ ("grpc.max_send_message_length", GS_GRPC_MAX_MESSAGE_LENGTH), ("grpc.max_receive_message_length", GS_GRPC_MAX_MESSAGE_LENGTH), ("grpc.max_metadata_size", GS_GRPC_MAX_MESSAGE_LENGTH), ] + self._endpoint = endpoint self._launcher = launcher self._grpc_utils = GRPCUtils() - self._channel = grpc.insecure_channel(endpoint, options=options) - self._stub = coordinator_service_pb2_grpc.CoordinatorServiceStub(self._channel) + self._stub = self._get_stub() self._session_id = None self._logs_fetching_thread = None self._reconnect = reconnect + def _get_stub(self): + channel = grpc.insecure_channel(self._endpoint, options=self._options) + return coordinator_service_pb2_grpc.CoordinatorServiceStub(channel) + def waiting_service_ready(self, timeout_seconds=60): begin_time = time.time() request = message_pb2.HeartBeatRequest() @@ -76,6 +80,9 @@ def waiting_service_ready(self, timeout_seconds=60): logger.warning("Heart beat analytical engine failed, %s", msg) if time.time() - begin_time >= timeout_seconds: raise ConnectionError(f"Connect coordinator timeout, {msg}") + # refresh the channel incase the server became available + if e.code() == grpc.StatusCode.UNAVAILABLE: + self._stub = self._get_stub() time.sleep(1) def connect(self, cleanup_instance=True, dangling_timeout_seconds=60): diff --git a/python/graphscope/config.py b/python/graphscope/config.py index c0294fdb3337..f075cf3a7b10 100644 --- a/python/graphscope/config.py +++ b/python/graphscope/config.py @@ -228,6 +228,12 @@ class VineyardConfig: ) +@dataclass +class InteractiveConfig: + # a map from internal port to external port + port_mapping: Union[dict, None] = None + + @dataclass class CoordinatorConfig: endpoint: Union[str, None] = None @@ -351,6 +357,8 @@ class Config(Serializable): coordinator: CoordinatorConfig = field(default_factory=CoordinatorConfig) # Vineyard configuration. vineyard: VineyardConfig = field(default_factory=VineyardConfig) + # Interactive configuration. + interactive: InteractiveConfig = field(default_factory=InteractiveConfig) # Local cluster configuration. hosts_launcher: HostsLauncherConfig = field(default_factory=HostsLauncherConfig) diff --git a/python/graphscope/gsctl/commands/dev.py b/python/graphscope/gsctl/commands/dev.py index c88a7977a12c..98c09e1dcbd6 100644 --- a/python/graphscope/gsctl/commands/dev.py +++ b/python/graphscope/gsctl/commands/dev.py @@ -251,7 +251,6 @@ def deploy( ] if gremlin_port != -1: cmd.extend(["-p", f"{gremlin_port}:8182"]) - image = f"{image_registry}/{type}:{image_tag}" if interactive_config is not None: if not os.path.isfile(interactive_config): click.secho( @@ -263,7 +262,14 @@ def deploy( cmd.extend( ["-v", f"{interactive_config}:{INTERACTIVE_DOCKER_DEFAULT_CONFIG_PATH}"] ) + image = f"{image_registry}/{type}:{image_tag}" cmd.extend([image, "--enable-coordinator"]) + cmd.extend( + [ + "--port-mapping", + f"8080:{coordinator_port},7777:{admin_port},10000:{storedproc_port},7687:{cypher_port}", + ] + ) returncode = run_shell_cmd(cmd, os.getcwd()) if returncode == 0: message = f""" diff --git a/python/requirements.txt b/python/requirements.txt index 7d7d3206f70e..e374967c375d 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -1,4 +1,4 @@ -Cython>=3.0.0b3 +Cython>=3.0.0b3,<3.1.0 gremlinpython==3.7.0 grpcio>=1.49 grpcio-tools>=1.49