From 3aa268dc0cf060d61bedca5567b1dc4e0f500bfb Mon Sep 17 00:00:00 2001 From: Dongze Li Date: Wed, 12 Jun 2024 16:11:04 +0800 Subject: [PATCH 1/4] Start the service when switching graph context on Interactive --- .../gsctl/commands/interactive/glob.py | 174 ++++++++++----- .../gsctl/commands/interactive/graph.py | 206 ++++++++---------- python/graphscope/gsctl/utils.py | 2 - 3 files changed, 201 insertions(+), 181 deletions(-) diff --git a/python/graphscope/gsctl/commands/interactive/glob.py b/python/graphscope/gsctl/commands/interactive/glob.py index 4167d7277188..ed1847ecee43 100644 --- a/python/graphscope/gsctl/commands/interactive/glob.py +++ b/python/graphscope/gsctl/commands/interactive/glob.py @@ -19,18 +19,22 @@ import click import yaml +from graphscope.gsctl.impl import bind_datasource_in_batch from graphscope.gsctl.impl import create_graph from graphscope.gsctl.impl import delete_graph_by_id +from graphscope.gsctl.impl import delete_job_by_id from graphscope.gsctl.impl import get_datasource_by_id from graphscope.gsctl.impl import get_graph_id_by_name +from graphscope.gsctl.impl import get_job_by_id from graphscope.gsctl.impl import list_graphs from graphscope.gsctl.impl import list_jobs from graphscope.gsctl.impl import list_service_status from graphscope.gsctl.impl import list_stored_procedures -from graphscope.gsctl.impl import restart_service from graphscope.gsctl.impl import start_service -from graphscope.gsctl.impl import stop_service +from graphscope.gsctl.impl import submit_dataloading_job from graphscope.gsctl.impl import switch_context +from graphscope.gsctl.impl import unbind_edge_datasource +from graphscope.gsctl.impl import unbind_vertex_datasource from graphscope.gsctl.utils import TreeDisplay from graphscope.gsctl.utils import err from graphscope.gsctl.utils import info @@ -47,19 +51,19 @@ def cli(): @cli.group() def create(): - """Create a new graph in database""" + """Create graph, data source, loader job from file""" pass @cli.group() def delete(): - """Delete a graph by identifier""" + """Delete graph, data source, loader job by id""" pass @cli.group() -def service(): - """Start, stop, and restart the database service""" +def desc(): + """Show job's details by id""" pass @@ -71,7 +75,13 @@ def service(): def use(context, graph_identifier): """Switch to GRAPH context, see identifier with `ls` command""" try: - switch_context(get_graph_id_by_name(graph_identifier)) + graph_identifier = get_graph_id_by_name(graph_identifier) + status = list_service_status() + for s in status: + if s.graph_id == graph_identifier and s.status != "Running": + info(f"Starting service on graph {graph_identifier}...") + start_service(graph_identifier) + switch_context(graph_identifier) except Exception as e: err(f"Failed to switch context: {str(e)}") else: @@ -89,7 +99,7 @@ def ls(l): # noqa: F811, E741 # schema tree.create_graph_node(g, recursive=l) if l: - # data source mappin + # data source mapping datasource_mapping = get_datasource_by_id(g.id) tree.create_datasource_mapping_node(g, datasource_mapping) # stored procedure @@ -139,79 +149,127 @@ def graph(graph_identifier): # noqa: F811 succ(f"Delete graph {graph_identifier} successfully.") -@service.command -def stop(): # noqa: F811 - """Stop current database service""" +@create.command +@click.option( + "-g", + "--graph_identifier", + required=True, + help="See graph identifier with `ls` command", +) +@click.option( + "-f", + "--filename", + required=True, + help="Path of yaml file", +) +def datasource(graph_identifier, filename): # noqa: F811 + """Bind data source mapping from file""" + if not is_valid_file_path(filename): + err(f"Invalid file: {filename}") + return + graph_identifier = get_graph_id_by_name(graph_identifier) + try: + datasource = read_yaml_file(filename) + bind_datasource_in_batch(graph_identifier, datasource) + except Exception as e: + err(f"Failed to bind data source: {str(e)}") + else: + succ("Bind data source successfully.") + + +@delete.command +@click.option( + "-g", + "--graph_identifier", + required=True, + help="See graph identifier with `ls` command", +) +@click.option( + "-t", + "--type", + required=True, + help="Vertex or edge type", +) +@click.option( + "-s", + "--source_vertex_type", + required=False, + help="Source vertex type of the edge [edge only]", +) +@click.option( + "-d", + "--destination_vertex_type", + required=False, + help="Destination vertex type of the edge [edge only]", +) +def datasource( # noqa: F811 + graph_identifier, type, source_vertex_type, destination_vertex_type +): + """Unbind data source mapping on vertex or edge type""" try: - stop_service() + graph_identifier = get_graph_id_by_name(graph_identifier) + if source_vertex_type is not None and destination_vertex_type is not None: + unbind_edge_datasource( + graph_identifier, type, source_vertex_type, destination_vertex_type + ) + else: + unbind_vertex_datasource(graph_identifier, type) except Exception as e: - err(f"Failed to stop service: {str(e)}") + err(f"Failed to unbind data source: {str(e)}") else: - succ("Service stopped.") + succ("Unbind data source successfully.") -@service.command +@create.command() @click.option( "-g", "--graph_identifier", required=True, help="See graph identifier with `ls` command", ) -def start(graph_identifier): # noqa: F811 - """Start database service on a certain graph""" +@click.option( + "-f", + "--filename", + required=True, + help="Path of yaml file", +) +def loaderjob(graph_identifier, filename): # noqa: F811 + """Create a data loading job from file""" + if not is_valid_file_path(filename): + err(f"Invalid file: {filename}") + return + graph_identifier = get_graph_id_by_name(graph_identifier) try: - start_service(get_graph_id_by_name(graph_identifier)) + config = read_yaml_file(filename) + jobid = submit_dataloading_job(graph_identifier, config) except Exception as e: - err(f"Failed to start service on graph {graph_identifier}: {str(e)}") + err(f"Failed to create a job: {str(e)}") else: - succ(f"Start service on graph {graph_identifier} successfully") + succ(f"Create job {jobid} successfully.") -@service.command -def restart(): # noqa: F811 - """Restart database service on current graph""" +@delete.command() +@click.argument("identifier", required=True) +def job(identifier): # noqa: F811 + """Cancel a job, see identifier with `ls` command""" try: - restart_service() + delete_job_by_id(identifier) except Exception as e: - err(f"Failed to restart service: {str(e)}") + err(f"Failed to delete job {identifier}: {str(e)}") else: - succ("Service restarted.") - - -@service.command -def ls(): # noqa: F811 - """Display current service status""" - - def _construct_and_display_data(status): - head = [ - "STATUS", - "SERVING_GRAPH(IDENTIFIER)", - "CYPHER_ENDPOINT", - "HQPS_ENDPOINT", - "GREMLIN_ENDPOINT", - ] - data = [head] - for s in status: - if s.status == "Stopped": - data.append([s.status, s.graph_id, "-", "-", "-"]) - else: - data.append( - [ - s.status, - s.graph_id, - s.sdk_endpoints.cypher, - s.sdk_endpoints.hqps, - s.sdk_endpoints.gremlin, - ] - ) - terminal_display(data) + succ(f"Delete job {identifier} successfully.") + +@desc.command() +@click.argument("identifier", required=True) +def job(identifier): # noqa: F811 + """Show details of job, see identifier with `ls` command""" try: - status = list_service_status() + job = get_job_by_id(identifier) except Exception as e: - err(f"Failed to list service status: {str(e)}") + err(f"Failed to get job: {str(e)}") else: - _construct_and_display_data(status) + info(yaml.dump(job.to_dict())) if __name__ == "__main__": diff --git a/python/graphscope/gsctl/commands/interactive/graph.py b/python/graphscope/gsctl/commands/interactive/graph.py index 942972f3db1e..640ff5a12d6f 100644 --- a/python/graphscope/gsctl/commands/interactive/graph.py +++ b/python/graphscope/gsctl/commands/interactive/graph.py @@ -20,26 +20,22 @@ import yaml from graphscope.gsctl.config import get_current_context -from graphscope.gsctl.impl import bind_datasource_in_batch from graphscope.gsctl.impl import create_stored_procedure -from graphscope.gsctl.impl import delete_job_by_id from graphscope.gsctl.impl import delete_stored_procedure_by_id from graphscope.gsctl.impl import get_datasource_by_id -from graphscope.gsctl.impl import get_graph_id_by_name -from graphscope.gsctl.impl import get_job_by_id from graphscope.gsctl.impl import list_graphs -from graphscope.gsctl.impl import list_jobs +from graphscope.gsctl.impl import list_service_status from graphscope.gsctl.impl import list_stored_procedures -from graphscope.gsctl.impl import submit_dataloading_job +from graphscope.gsctl.impl import start_service +from graphscope.gsctl.impl import stop_service from graphscope.gsctl.impl import switch_context -from graphscope.gsctl.impl import unbind_edge_datasource -from graphscope.gsctl.impl import unbind_vertex_datasource from graphscope.gsctl.utils import TreeDisplay from graphscope.gsctl.utils import err from graphscope.gsctl.utils import info from graphscope.gsctl.utils import is_valid_file_path from graphscope.gsctl.utils import read_yaml_file from graphscope.gsctl.utils import succ +from graphscope.gsctl.utils import terminal_display @click.group() @@ -49,19 +45,25 @@ def cli(): @cli.group() def create(): - """Create stored procedure, data source, loader job from file""" + """Create stored procedure from file""" pass @cli.group() def delete(): - """Delete stored procedure, data source, loader job by id""" + """Delete stored procedure by id""" pass @cli.group() def desc(): - """Show details of job status and stored procedure by id""" + """Show stored procedure's details by id""" + pass + + +@cli.group() +def service(): + """Start, stop, and restart the database service""" pass @@ -73,7 +75,7 @@ def use(): @cli.command() def ls(): # noqa: F811 - """Display schema, stored procedure, and job information""" + """Display schema and stored procedure information""" tree = TreeDisplay() # context current_context = get_current_context() @@ -92,9 +94,6 @@ def ls(): # noqa: F811 # stored procedure stored_procedures = list_stored_procedures(using_graph.id) tree.create_stored_procedure_node(using_graph, stored_procedures) - # job - jobs = list_jobs() - tree.create_job_node(using_graph, jobs) except Exception as e: err(f"Failed to display graph information: {str(e)}") else: @@ -138,112 +137,6 @@ def storedproc(identifier): # noqa: F811 succ(f"Delete stored procedure {identifier} successfully.") -@create.command -@click.option( - "-f", - "--filename", - required=True, - help="Path of yaml file", -) -def datasource(filename): # noqa: F811 - """Bind data source mapping from file""" - if not is_valid_file_path(filename): - err(f"Invalid file: {filename}") - return - current_context = get_current_context() - graph_identifier = current_context.context - try: - datasource = read_yaml_file(filename) - bind_datasource_in_batch(graph_identifier, datasource) - except Exception as e: - err(f"Failed to bind data source: {str(e)}") - else: - succ("Bind data source successfully.") - - -@delete.command -@click.option( - "-t", - "--type", - required=True, - help="Vertex or edge type", -) -@click.option( - "-s", - "--source_vertex_type", - required=False, - help="Source vertex type of the edge [edge only]", -) -@click.option( - "-d", - "--destination_vertex_type", - required=False, - help="Destination vertex type of the edge [edge only]", -) -def datasource(type, source_vertex_type, destination_vertex_type): # noqa: F811 - """Unbind data source mapping on vertex or edge type""" - try: - current_context = get_current_context() - graph_identifier = current_context.context - if source_vertex_type is not None and destination_vertex_type is not None: - unbind_edge_datasource( - graph_identifier, type, source_vertex_type, destination_vertex_type - ) - else: - unbind_vertex_datasource(graph_identifier, type) - except Exception as e: - err(f"Failed to unbind data source: {str(e)}") - else: - succ("Unbind data source successfully.") - - -@create.command() -@click.option( - "-f", - "--filename", - required=True, - help="Path of yaml file", -) -def loaderjob(filename): # noqa: F811 - """Create a data loading job from file""" - if not is_valid_file_path(filename): - err(f"Invalid file: {filename}") - return - current_context = get_current_context() - graph_identifier = current_context.context - try: - config = read_yaml_file(filename) - jobid = submit_dataloading_job(graph_identifier, config) - except Exception as e: - err(f"Failed to create a job: {str(e)}") - else: - succ(f"Create job {jobid} successfully.") - - -@delete.command() -@click.argument("identifier", required=True) -def job(identifier): # noqa: F811 - """Cancel a job, see identifier with `ls` command""" - try: - delete_job_by_id(identifier) - except Exception as e: - err(f"Failed to delete job {identifier}: {str(e)}") - else: - succ(f"Delete job {identifier} successfully.") - - -@desc.command() -@click.argument("identifier", required=True) -def job(identifier): # noqa: F811 - """Show details of job, see identifier with `ls` command""" - try: - job = get_job_by_id(identifier) - except Exception as e: - err(f"Failed to get job: {str(e)}") - else: - info(yaml.dump(job.to_dict())) - - @desc.command() @click.argument("identifier", required=True) def storedproc(identifier): # noqa: F811 @@ -268,6 +161,77 @@ def storedproc(identifier): # noqa: F811 err(f"Stored Procedure {identifier} not found on {graph_id}.") +@service.command +def stop(): # noqa: F811 + """Stop current database service""" + try: + stop_service() + except Exception as e: + err(f"Failed to stop service: {str(e)}") + else: + succ("Service stopped.") + + +@service.command +def start(): # noqa: F811 + """Start current database service""" + try: + current_context = get_current_context() + graph_identifier = current_context.context + + status = list_service_status() + for s in status: + if s.graph_id == graph_identifier: + if s.status != "Running": + info(f"Starting service on graph {graph_identifier}...") + start_service(graph_identifier) + succ("Service restarted.") + else: + info("Service is running...") + except Exception as e: + err(f"Failed to start service: {str(e)}") + + +@service.command +def status(): # noqa: F811 + """Display current service status""" + + def _construct_and_display_data(status): + current_context = get_current_context() + graph_identifier = current_context.context + + head = [ + "STATUS", + "SERVING_GRAPH(IDENTIFIER)", + "CYPHER_ENDPOINT", + "HQPS_ENDPOINT", + "GREMLIN_ENDPOINT", + ] + data = [head] + for s in status: + if s.graph_id == graph_identifier: + if s.status == "Stopped": + data.append([s.status, s.graph_id, "-", "-", "-"]) + else: + data.append( + [ + s.status, + s.graph_id, + s.sdk_endpoints.cypher, + s.sdk_endpoints.hqps, + s.sdk_endpoints.gremlin, + ] + ) + terminal_display(data) + + try: + status = list_service_status() + except Exception as e: + err(f"Failed to list service status: {str(e)}") + else: + _construct_and_display_data(status) + + @use.command(name="GLOBAL") def _global(): """Switch back to the global scope""" diff --git a/python/graphscope/gsctl/utils.py b/python/graphscope/gsctl/utils.py index 167cbe20839a..f5fb39b894cb 100644 --- a/python/graphscope/gsctl/utils.py +++ b/python/graphscope/gsctl/utils.py @@ -483,7 +483,5 @@ def show(self, graph_identifier=None, stdout=False, sorting=False): f"{graph_identifier}_stored_procedure" ) click.secho(stored_procedure_tree.show(stdout=False, sorting=False)) - job_tree = self.tree.subtree(f"{graph_identifier}_job") - click.secho(job_tree.show(stdout=False, sorting=False)) else: click.secho(self.tree.show(stdout=stdout, sorting=sorting)) From 90f364da1c1b756b799a8e8ebf04b01eeb45e076 Mon Sep 17 00:00:00 2001 From: Dongze Li Date: Wed, 12 Jun 2024 16:38:14 +0800 Subject: [PATCH 2/4] Update gsctl doc --- Makefile | 4 +--- docs/flex/coordinator.md | 2 +- docs/utilities/gs.md | 9 ++++----- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/Makefile b/Makefile index 1c48747da292..683498372d88 100644 --- a/Makefile +++ b/Makefile @@ -79,9 +79,7 @@ clean: gsctl: cd $(CLIENT_DIR) && \ - python3 setup_gsctl.py bdist_wheel && \ - python3 -m pip install dist/gsctl*.whl --force-reinstall && \ - rm -fr build + python3 ./setup_gsctl.py develop --user client: learning diff --git a/docs/flex/coordinator.md b/docs/flex/coordinator.md index d1290a7154c4..0b47897a8eb5 100644 --- a/docs/flex/coordinator.md +++ b/docs/flex/coordinator.md @@ -1,4 +1,4 @@ -# GraphScope Coordinator +# Coordinator The GraphScope Coordinator serves as a centralized entry point for users, providing a RESTful API that follows the Swagger specification. It supports multiple language SDKs, including Python, and offers a unified interface. The main purpose of the Coordinator is to abstract and standardize the underlying engines and storage systems, shielding users from their complexities. This allows users to interact with the GraphScope platform through a simplified and consistent set of APIs, making it easier for users to understand and utilize the functionalities provided by GraphScope. diff --git a/docs/utilities/gs.md b/docs/utilities/gs.md index 903828e6b559..469bde45cc0e 100644 --- a/docs/utilities/gs.md +++ b/docs/utilities/gs.md @@ -1,23 +1,22 @@ # Command-line Utility `gsctl` -`gsctl` is a command-line utility for GraphScope. It is shipped with `graphscope-client` and provides a set of functionalities to make it easy to use GraphScope. These functionalities include building and testing binaries, managing sessions and resources, and more. +`gsctl` is a command-line utility for GraphScope. It provides a set of functionalities to make it easy to use GraphScope. These functionalities include building and testing binaries, managing sessions and resources, and more. ## Install/Update `gsctl` -Since it is shipped with python package `graphscope-client`, the `gsctl` command will be available in your terminal after installing GraphScope: ```bash -$ pip3 install graphscope-client +$ pip3 install gsctl ``` In some cases, such as development on `gsctl`, you may want to build it from source. To do this, navigate to the directory where the source code is located and run the following command: ```bash -$ cd REPO_HOME/python +$ cd REPO_HOME # If you want to develop gsctl, # please note the entry point is located on: # /python/graphscope/gsctl/gsctl.py -$ pip3 install --editable . +$ make gsctl ``` This will install `gsctl` in an editable mode, which means that any changes you make to the source code will be reflected in the installed version of `gsctl`. From 5f7d4e372c5205e629ed637f8f08958dd95d64b4 Mon Sep 17 00:00:00 2001 From: Dongze Li Date: Thu, 13 Jun 2024 17:56:39 +0800 Subject: [PATCH 3/4] Return metadata when uploading file --- .../gscoordinator/flex/core/client_wrapper.py | 11 +++--- coordinator/gscoordinator/flex/core/utils.py | 24 +++++++++++++ .../flex/models/upload_file_response.py | 34 +++++++++++++++++-- .../gscoordinator/flex/openapi/openapi.yaml | 12 +++++++ flex/openapi/openapi_coordinator.yaml | 9 +++++ .../flex/rest/models/upload_file_response.py | 6 ++-- python/graphscope/gsctl/commands/__init__.py | 2 +- .../gsctl/commands/interactive/glob.py | 6 ++-- .../gsctl/commands/interactive/graph.py | 8 +++-- python/graphscope/gsctl/config.py | 7 ++++ python/graphscope/gsctl/impl/__init__.py | 1 + python/graphscope/gsctl/impl/graph.py | 8 +++++ python/graphscope/gsctl/impl/utils.py | 3 +- 13 files changed, 116 insertions(+), 15 deletions(-) diff --git a/coordinator/gscoordinator/flex/core/client_wrapper.py b/coordinator/gscoordinator/flex/core/client_wrapper.py index 424af453cfde..30068d4ece66 100644 --- a/coordinator/gscoordinator/flex/core/client_wrapper.py +++ b/coordinator/gscoordinator/flex/core/client_wrapper.py @@ -37,6 +37,7 @@ from gscoordinator.flex.core.insight import init_groot_client from gscoordinator.flex.core.interactive import init_hqps_client from gscoordinator.flex.core.utils import encode_datetime +from gscoordinator.flex.core.utils import parse_file_metadata from gscoordinator.flex.models import CreateDataloadingJobResponse from gscoordinator.flex.models import CreateEdgeType from gscoordinator.flex.models import CreateGraphRequest @@ -289,7 +290,8 @@ def upload_file(self, filestorage) -> str: if CLUSTER_TYPE == "HOSTS": filepath = os.path.join(DATASET_WORKSPACE, filestorage.filename) filestorage.save(filepath) - return UploadFileResponse.from_dict({"file_path": filepath}) + metadata = parse_file_metadata(filepath) + return UploadFileResponse.from_dict({"file_path": filepath, "metadata": metadata}) def bind_datasource_in_batch( self, graph_id: str, schema_mapping: SchemaMapping @@ -301,9 +303,10 @@ def bind_datasource_in_batch( schema_mapping_dict["vertex_mappings"], schema_mapping_dict["edge_mappings"], ): - for column_mapping in mapping["column_mappings"]: - if "_property" in column_mapping: - column_mapping["property"] = column_mapping.pop("_property") + if "column_mappings" in mapping and mapping["column_mappings"] is not None: + for column_mapping in mapping["column_mappings"]: + if "_property" in column_mapping: + column_mapping["property"] = column_mapping.pop("_property") if ( "source_vertex_mappings" in mapping and "destination_vertex_mappings" in mapping diff --git a/coordinator/gscoordinator/flex/core/utils.py b/coordinator/gscoordinator/flex/core/utils.py index ff6c65fa9c26..609ab85dbf8e 100644 --- a/coordinator/gscoordinator/flex/core/utils.py +++ b/coordinator/gscoordinator/flex/core/utils.py @@ -19,7 +19,9 @@ import datetime import functools import logging +import os import random +import re import socket import string from typing import Union @@ -83,6 +85,28 @@ def get_internal_ip() -> str: return internal_ip +def parse_file_metadata(location: str) -> dict: + """ + Args: + location: optional values: + odps://path/to/file, hdfs://path/to/file, file:///path/to/file + /home/graphscope/path/to/file + """ + metadata = {"datasource": "file"} + path = location + pattern = r"^(odps|hdfs|file|oss|s3)?://([\w/.-]+)$" + match = re.match(pattern, location) + if match: + datasource = match.group(1) + metadata["datasource"] = datasource + if datasource == "file": + path = match.group(2) + if metadata["datasource"] == "file": + _, file_extension = os.path.splitext(path) + metadata["file_type"] = file_extension[1:] + return metadata + + def get_public_ip() -> Union[str, None]: try: response = requests.get("https://api.ipify.org?format=json") diff --git a/coordinator/gscoordinator/flex/models/upload_file_response.py b/coordinator/gscoordinator/flex/models/upload_file_response.py index b14ecb242d1b..30f289473916 100644 --- a/coordinator/gscoordinator/flex/models/upload_file_response.py +++ b/coordinator/gscoordinator/flex/models/upload_file_response.py @@ -12,21 +12,26 @@ class UploadFileResponse(Model): Do not edit the class manually. """ - def __init__(self, file_path=None): # noqa: E501 + def __init__(self, file_path=None, metadata=None): # noqa: E501 """UploadFileResponse - a model defined in OpenAPI :param file_path: The file_path of this UploadFileResponse. # noqa: E501 :type file_path: str + :param metadata: The metadata of this UploadFileResponse. # noqa: E501 + :type metadata: Dict[str, object] """ self.openapi_types = { - 'file_path': str + 'file_path': str, + 'metadata': Dict[str, object] } self.attribute_map = { - 'file_path': 'file_path' + 'file_path': 'file_path', + 'metadata': 'metadata' } self._file_path = file_path + self._metadata = metadata @classmethod def from_dict(cls, dikt) -> 'UploadFileResponse': @@ -61,3 +66,26 @@ def file_path(self, file_path: str): raise ValueError("Invalid value for `file_path`, must not be `None`") # noqa: E501 self._file_path = file_path + + @property + def metadata(self) -> Dict[str, object]: + """Gets the metadata of this UploadFileResponse. + + + :return: The metadata of this UploadFileResponse. + :rtype: Dict[str, object] + """ + return self._metadata + + @metadata.setter + def metadata(self, metadata: Dict[str, object]): + """Sets the metadata of this UploadFileResponse. + + + :param metadata: The metadata of this UploadFileResponse. + :type metadata: Dict[str, object] + """ + if metadata is None: + raise ValueError("Invalid value for `metadata`, must not be `None`") # noqa: E501 + + self._metadata = metadata diff --git a/coordinator/gscoordinator/flex/openapi/openapi.yaml b/coordinator/gscoordinator/flex/openapi/openapi.yaml index 9e93c04bdaa1..5f67843adf4b 100644 --- a/coordinator/gscoordinator/flex/openapi/openapi.yaml +++ b/coordinator/gscoordinator/flex/openapi/openapi.yaml @@ -473,6 +473,11 @@ paths: "200": content: application/json: + example: + file_path: /home/graphscope/path/to/file.csv + metadata: + datasource: file + file_type: csv schema: $ref: '#/components/schemas/UploadFileResponse' description: successful operation @@ -2008,12 +2013,19 @@ components: UploadFileResponse: example: file_path: file_path + metadata: + key: "" properties: file_path: title: file_path type: string + metadata: + additionalProperties: true + title: metadata + type: object required: - file_path + - metadata title: UploadFileResponse LongText: properties: diff --git a/flex/openapi/openapi_coordinator.yaml b/flex/openapi/openapi_coordinator.yaml index 9201723ce6a4..f71388f4c937 100644 --- a/flex/openapi/openapi_coordinator.yaml +++ b/flex/openapi/openapi_coordinator.yaml @@ -169,9 +169,13 @@ components: UploadFileResponse: required: - file_path + - metadata properties: file_path: type: string + metadata: + type: object + additionalProperties: true LongText: required: @@ -2297,5 +2301,10 @@ paths: application/json: schema: $ref: '#/components/schemas/UploadFileResponse' + example: + file_path: /home/graphscope/path/to/file.csv + metadata: + datasource: file + file_type: csv 500: $ref: "#/components/responses/500" diff --git a/python/graphscope/flex/rest/models/upload_file_response.py b/python/graphscope/flex/rest/models/upload_file_response.py index 5d90a08bef55..29daa96ec0ff 100644 --- a/python/graphscope/flex/rest/models/upload_file_response.py +++ b/python/graphscope/flex/rest/models/upload_file_response.py @@ -28,7 +28,8 @@ class UploadFileResponse(BaseModel): UploadFileResponse """ # noqa: E501 file_path: StrictStr - __properties: ClassVar[List[str]] = ["file_path"] + metadata: Dict[str, Any] + __properties: ClassVar[List[str]] = ["file_path", "metadata"] model_config = { "populate_by_name": True, @@ -81,7 +82,8 @@ def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: return cls.model_validate(obj) _obj = cls.model_validate({ - "file_path": obj.get("file_path") + "file_path": obj.get("file_path"), + "metadata": obj.get("metadata") }) return _obj diff --git a/python/graphscope/gsctl/commands/__init__.py b/python/graphscope/gsctl/commands/__init__.py index 5998bb958dd7..2ae55c6b1d56 100644 --- a/python/graphscope/gsctl/commands/__init__.py +++ b/python/graphscope/gsctl/commands/__init__.py @@ -108,7 +108,7 @@ def get_command_collection(context: Context): commands = click.CommandCollection(sources=[common, interactive]) else: if len(sys.argv) < 2 or sys.argv[1] != "use": - info(f"Using GRAPH {context.context}.", fg="green", bold=True) + info(f"Using GRAPH {context.graph_name}.", fg="green", bold=True) info("Run `gsctl use GLOBAL` to switch back to GLOBAL context.\n") commands = click.CommandCollection(sources=[common, interactive_graph]) elif is_insight_mode(context.flex): diff --git a/python/graphscope/gsctl/commands/interactive/glob.py b/python/graphscope/gsctl/commands/interactive/glob.py index ed1847ecee43..1b85360f996c 100644 --- a/python/graphscope/gsctl/commands/interactive/glob.py +++ b/python/graphscope/gsctl/commands/interactive/glob.py @@ -25,6 +25,7 @@ from graphscope.gsctl.impl import delete_job_by_id from graphscope.gsctl.impl import get_datasource_by_id from graphscope.gsctl.impl import get_graph_id_by_name +from graphscope.gsctl.impl import get_graph_name_by_id from graphscope.gsctl.impl import get_job_by_id from graphscope.gsctl.impl import list_graphs from graphscope.gsctl.impl import list_jobs @@ -76,16 +77,17 @@ def use(context, graph_identifier): """Switch to GRAPH context, see identifier with `ls` command""" try: graph_identifier = get_graph_id_by_name(graph_identifier) + graph_name = get_graph_name_by_id(graph_identifier) status = list_service_status() for s in status: if s.graph_id == graph_identifier and s.status != "Running": info(f"Starting service on graph {graph_identifier}...") start_service(graph_identifier) - switch_context(graph_identifier) + switch_context(graph_identifier, graph_name) except Exception as e: err(f"Failed to switch context: {str(e)}") else: - click.secho(f"Using GRAPH {graph_identifier}", fg="green") + click.secho(f"Using GRAPH {graph_name}", fg="green") @cli.command() diff --git a/python/graphscope/gsctl/commands/interactive/graph.py b/python/graphscope/gsctl/commands/interactive/graph.py index 640ff5a12d6f..a792931059e6 100644 --- a/python/graphscope/gsctl/commands/interactive/graph.py +++ b/python/graphscope/gsctl/commands/interactive/graph.py @@ -23,6 +23,7 @@ from graphscope.gsctl.impl import create_stored_procedure from graphscope.gsctl.impl import delete_stored_procedure_by_id from graphscope.gsctl.impl import get_datasource_by_id +from graphscope.gsctl.impl import get_graph_name_by_id from graphscope.gsctl.impl import list_graphs from graphscope.gsctl.impl import list_service_status from graphscope.gsctl.impl import list_stored_procedures @@ -199,6 +200,7 @@ def status(): # noqa: F811 def _construct_and_display_data(status): current_context = get_current_context() graph_identifier = current_context.context + graph_name = current_context.graph_name head = [ "STATUS", @@ -211,12 +213,14 @@ def _construct_and_display_data(status): for s in status: if s.graph_id == graph_identifier: if s.status == "Stopped": - data.append([s.status, s.graph_id, "-", "-", "-"]) + data.append( + [s.status, f"{graph_name}({s.graph_id})", "-", "-", "-"] + ) else: data.append( [ s.status, - s.graph_id, + f"{graph_name}({s.graph_id})", s.sdk_endpoints.cypher, s.sdk_endpoints.hqps, s.sdk_endpoints.gremlin, diff --git a/python/graphscope/gsctl/config.py b/python/graphscope/gsctl/config.py index 4eaa4dc3d566..067b8660a8ee 100644 --- a/python/graphscope/gsctl/config.py +++ b/python/graphscope/gsctl/config.py @@ -46,6 +46,7 @@ def __init__( name=None, timestamp=time.time(), context="global", + graph_name=None, ): if name is None: name = "context_" + "".join(random.choices(ascii_letters, k=8)) @@ -55,14 +56,19 @@ def __init__( self.coordinator_endpoint = coordinator_endpoint # switch to specific graph after `using graph` self.context = context + self.graph_name = graph_name self.timestamp = timestamp def switch_context(self, context: str): self.context = context + def set_graph_name(self, graph_name: str): + self.graph_name = graph_name + def to_dict(self) -> dict: return { "name": self.name, + "graph_name": str(self.graph_name), "flex": self.flex, "coordinator_endpoint": self.coordinator_endpoint, "context": self.context, @@ -77,6 +83,7 @@ def from_dict(cls, dikt): name=dikt.get("name"), timestamp=dikt.get("timestamp"), context=dikt.get("context"), + graph_name=dikt.get("graph_name", None), ) def is_expired(self, validity_period=86400) -> bool: diff --git a/python/graphscope/gsctl/impl/__init__.py b/python/graphscope/gsctl/impl/__init__.py index 12677c62629e..7fdd2ff4e37a 100644 --- a/python/graphscope/gsctl/impl/__init__.py +++ b/python/graphscope/gsctl/impl/__init__.py @@ -25,6 +25,7 @@ from graphscope.gsctl.impl.graph import create_graph from graphscope.gsctl.impl.graph import delete_graph_by_id from graphscope.gsctl.impl.graph import get_graph_id_by_name +from graphscope.gsctl.impl.graph import get_graph_name_by_id from graphscope.gsctl.impl.graph import list_graphs from graphscope.gsctl.impl.job import delete_job_by_id from graphscope.gsctl.impl.job import get_job_by_id diff --git a/python/graphscope/gsctl/impl/graph.py b/python/graphscope/gsctl/impl/graph.py index 06399e7968d3..269b4a56e9ca 100644 --- a/python/graphscope/gsctl/impl/graph.py +++ b/python/graphscope/gsctl/impl/graph.py @@ -70,3 +70,11 @@ def get_graph_id_by_name(name_or_id: str): f"Found multiple id candidates {id_candidate} for graph {name_or_id}, please choose one." ) return id_candidate[0] + + +def get_graph_name_by_id(graph_identifier: str): + graphs = list_graphs() + for g in graphs: + if g.id == graph_identifier: + return g.name + return graph_identifier diff --git a/python/graphscope/gsctl/impl/utils.py b/python/graphscope/gsctl/impl/utils.py index cd30821bddfe..977cbf38e755 100644 --- a/python/graphscope/gsctl/impl/utils.py +++ b/python/graphscope/gsctl/impl/utils.py @@ -30,8 +30,9 @@ def upload_file(location: str) -> str: return api_instance.upload_file(location).file_path -def switch_context(context: str): +def switch_context(context: str, graph_name=None): config = load_gs_config() current_context = get_current_context() current_context.switch_context(context) + current_context.set_graph_name(graph_name) config.update_and_write(current_context) From 6d201076105f40291638cd8104219998c3f4b556 Mon Sep 17 00:00:00 2001 From: Dongze Li Date: Thu, 13 Jun 2024 20:52:30 +0800 Subject: [PATCH 4/4] Add Y/N interaction when deleting resources --- python/graphscope/gsctl/commands/__init__.py | 6 ++++- .../gsctl/commands/interactive/glob.py | 18 +++++++------ .../gsctl/commands/interactive/graph.py | 22 ++++++++++++---- python/graphscope/gsctl/utils.py | 25 ++++++++++--------- 4 files changed, 45 insertions(+), 26 deletions(-) diff --git a/python/graphscope/gsctl/commands/__init__.py b/python/graphscope/gsctl/commands/__init__.py index 2ae55c6b1d56..437decd6ed22 100644 --- a/python/graphscope/gsctl/commands/__init__.py +++ b/python/graphscope/gsctl/commands/__init__.py @@ -108,7 +108,11 @@ def get_command_collection(context: Context): commands = click.CommandCollection(sources=[common, interactive]) else: if len(sys.argv) < 2 or sys.argv[1] != "use": - info(f"Using GRAPH {context.graph_name}.", fg="green", bold=True) + info( + f"Using GRAPH {context.graph_name}(id={context.context}).", + fg="green", + bold=True, + ) info("Run `gsctl use GLOBAL` to switch back to GLOBAL context.\n") commands = click.CommandCollection(sources=[common, interactive_graph]) elif is_insight_mode(context.flex): diff --git a/python/graphscope/gsctl/commands/interactive/glob.py b/python/graphscope/gsctl/commands/interactive/glob.py index 1b85360f996c..6edb74dcadba 100644 --- a/python/graphscope/gsctl/commands/interactive/glob.py +++ b/python/graphscope/gsctl/commands/interactive/glob.py @@ -81,13 +81,15 @@ def use(context, graph_identifier): status = list_service_status() for s in status: if s.graph_id == graph_identifier and s.status != "Running": - info(f"Starting service on graph {graph_identifier}...") + info( + f"Starting service on graph {graph_name}(id={graph_identifier})..." + ) start_service(graph_identifier) switch_context(graph_identifier, graph_name) except Exception as e: err(f"Failed to switch context: {str(e)}") else: - click.secho(f"Using GRAPH {graph_name}", fg="green") + click.secho(f"Using GRAPH {graph_name}(id={graph_identifier})", fg="green") @cli.command() @@ -144,11 +146,11 @@ def graph(filename): # noqa: F811 def graph(graph_identifier): # noqa: F811 """Delete a graph, see graph identifier with `ls` command""" try: - delete_graph_by_id(get_graph_id_by_name(graph_identifier)) + if click.confirm("Do you want to continue?"): + delete_graph_by_id(get_graph_id_by_name(graph_identifier)) + succ(f"Delete graph {graph_identifier} successfully.") except Exception as e: err(f"Failed to delete graph {graph_identifier}: {str(e)}") - else: - succ(f"Delete graph {graph_identifier} successfully.") @create.command @@ -255,11 +257,11 @@ def loaderjob(graph_identifier, filename): # noqa: F811 def job(identifier): # noqa: F811 """Cancel a job, see identifier with `ls` command""" try: - delete_job_by_id(identifier) + if click.confirm("Do you want to continue?"): + delete_job_by_id(identifier) + succ(f"Delete job {identifier} successfully.") except Exception as e: err(f"Failed to delete job {identifier}: {str(e)}") - else: - succ(f"Delete job {identifier} successfully.") @desc.command() diff --git a/python/graphscope/gsctl/commands/interactive/graph.py b/python/graphscope/gsctl/commands/interactive/graph.py index a792931059e6..ee2431b2a23e 100644 --- a/python/graphscope/gsctl/commands/interactive/graph.py +++ b/python/graphscope/gsctl/commands/interactive/graph.py @@ -27,6 +27,7 @@ from graphscope.gsctl.impl import list_graphs from graphscope.gsctl.impl import list_service_status from graphscope.gsctl.impl import list_stored_procedures +from graphscope.gsctl.impl import restart_service from graphscope.gsctl.impl import start_service from graphscope.gsctl.impl import stop_service from graphscope.gsctl.impl import switch_context @@ -131,11 +132,11 @@ def storedproc(identifier): # noqa: F811 current_context = get_current_context() graph_identifier = current_context.context try: - delete_stored_procedure_by_id(graph_identifier, identifier) + if click.confirm("Do you want to continue?"): + delete_stored_procedure_by_id(graph_identifier, identifier) + succ(f"Delete stored procedure {identifier} successfully.") except Exception as e: err(f"Failed to delete stored procedure: {str(e)}") - else: - succ(f"Delete stored procedure {identifier} successfully.") @desc.command() @@ -193,6 +194,17 @@ def start(): # noqa: F811 err(f"Failed to start service: {str(e)}") +@service.command +def restart(): # noqa: F811 + """Start current database service""" + try: + restart_service() + except Exception as e: + err(f"Failed to restart service: {str(e)}") + else: + succ("Service restarted.") + + @service.command def status(): # noqa: F811 """Display current service status""" @@ -214,13 +226,13 @@ def _construct_and_display_data(status): if s.graph_id == graph_identifier: if s.status == "Stopped": data.append( - [s.status, f"{graph_name}({s.graph_id})", "-", "-", "-"] + [s.status, f"{graph_name}(id={s.graph_id})", "-", "-", "-"] ) else: data.append( [ s.status, - f"{graph_name}({s.graph_id})", + f"{graph_name}(id={s.graph_id})", s.sdk_endpoints.cypher, s.sdk_endpoints.hqps, s.sdk_endpoints.gremlin, diff --git a/python/graphscope/gsctl/utils.py b/python/graphscope/gsctl/utils.py index f5fb39b894cb..f679cbc5630b 100644 --- a/python/graphscope/gsctl/utils.py +++ b/python/graphscope/gsctl/utils.py @@ -459,18 +459,19 @@ def create_datasource_mapping_node(self, graph, datasource_mapping): parent=specific_edge_mapping_identifier, ) # property mapping - for property_column_mapping in mapping.column_mappings: - tag = "Property(name: {0}) -> DataSourceColumn(index: {1}, name: {2})".format( - property_column_mapping.var_property, - property_column_mapping.column.index, - property_column_mapping.column.name, - ) - p_mapping_identifier = f"{specific_edge_mapping_identifier}_{property_column_mapping.var_property}" - self.tree.create_node( - tag=tag, - identifier=p_mapping_identifier, - parent=specific_edge_mapping_identifier, - ) + if mapping.column_mappings is not None: + for property_column_mapping in mapping.column_mappings: + tag = "Property(name: {0}) -> DataSourceColumn(index: {1}, name: {2})".format( + property_column_mapping.var_property, + property_column_mapping.column.index, + property_column_mapping.column.name, + ) + p_mapping_identifier = f"{specific_edge_mapping_identifier}_{property_column_mapping.var_property}" + self.tree.create_node( + tag=tag, + identifier=p_mapping_identifier, + parent=specific_edge_mapping_identifier, + ) def show(self, graph_identifier=None, stdout=False, sorting=False): if graph_identifier is not None: