Skip to content

Commit

Permalink
Return metadata when uploading file
Browse files Browse the repository at this point in the history
  • Loading branch information
lidongze0629 committed Jun 13, 2024
1 parent 9cbbbc3 commit 40b3e8c
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 15 deletions.
11 changes: 7 additions & 4 deletions coordinator/gscoordinator/flex/core/client_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from gscoordinator.flex.core.insight import init_groot_client
from gscoordinator.flex.core.interactive import init_hqps_client
from gscoordinator.flex.core.utils import encode_datetime
from gscoordinator.flex.core.utils import parse_file_metadata
from gscoordinator.flex.models import CreateDataloadingJobResponse
from gscoordinator.flex.models import CreateEdgeType
from gscoordinator.flex.models import CreateGraphRequest
Expand Down Expand Up @@ -289,7 +290,8 @@ def upload_file(self, filestorage) -> str:
if CLUSTER_TYPE == "HOSTS":
filepath = os.path.join(DATASET_WORKSPACE, filestorage.filename)
filestorage.save(filepath)
return UploadFileResponse.from_dict({"file_path": filepath})
metadata = parse_file_metadata(filepath)
return UploadFileResponse.from_dict({"file_path": filepath, "metadata": metadata})

def bind_datasource_in_batch(
self, graph_id: str, schema_mapping: SchemaMapping
Expand All @@ -301,9 +303,10 @@ def bind_datasource_in_batch(
schema_mapping_dict["vertex_mappings"],
schema_mapping_dict["edge_mappings"],
):
for column_mapping in mapping["column_mappings"]:
if "_property" in column_mapping:
column_mapping["property"] = column_mapping.pop("_property")
if "column_mappings" in mapping and mapping["column_mappings"] is not None:
for column_mapping in mapping["column_mappings"]:
if "_property" in column_mapping:
column_mapping["property"] = column_mapping.pop("_property")
if (
"source_vertex_mappings" in mapping
and "destination_vertex_mappings" in mapping
Expand Down
24 changes: 24 additions & 0 deletions coordinator/gscoordinator/flex/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import datetime
import functools
import logging
import os
import random
import re
import socket
import string
from typing import Union
Expand Down Expand Up @@ -83,6 +85,28 @@ def get_internal_ip() -> str:
return internal_ip


def parse_file_metadata(location: str) -> dict:
"""
Args:
location: optional values:
odps://path/to/file, hdfs://path/to/file, file:///path/to/file
/home/graphscope/path/to/file
"""
metadata = {"datasource": "file"}
path = location
pattern = r"^(odps|hdfs|file|oss|s3)?://([\w/.-]+)$"
match = re.match(pattern, location)
if match:
datasource = match.group(1)
metadata["datasource"] = datasource
if datasource == "file":
path = match.group(2)
if metadata["datasource"] == "file":
_, file_extension = os.path.splitext(path)
metadata["file_type"] = file_extension[1:]
return metadata


def get_public_ip() -> Union[str, None]:
try:
response = requests.get("https://api.ipify.org?format=json")
Expand Down
34 changes: 31 additions & 3 deletions coordinator/gscoordinator/flex/models/upload_file_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,26 @@ class UploadFileResponse(Model):
Do not edit the class manually.
"""

def __init__(self, file_path=None): # noqa: E501
def __init__(self, file_path=None, metadata=None): # noqa: E501
"""UploadFileResponse - a model defined in OpenAPI
:param file_path: The file_path of this UploadFileResponse. # noqa: E501
:type file_path: str
:param metadata: The metadata of this UploadFileResponse. # noqa: E501
:type metadata: Dict[str, object]
"""
self.openapi_types = {
'file_path': str
'file_path': str,
'metadata': Dict[str, object]
}

self.attribute_map = {
'file_path': 'file_path'
'file_path': 'file_path',
'metadata': 'metadata'
}

self._file_path = file_path
self._metadata = metadata

@classmethod
def from_dict(cls, dikt) -> 'UploadFileResponse':
Expand Down Expand Up @@ -61,3 +66,26 @@ def file_path(self, file_path: str):
raise ValueError("Invalid value for `file_path`, must not be `None`") # noqa: E501

self._file_path = file_path

@property
def metadata(self) -> Dict[str, object]:
"""Gets the metadata of this UploadFileResponse.
:return: The metadata of this UploadFileResponse.
:rtype: Dict[str, object]
"""
return self._metadata

@metadata.setter
def metadata(self, metadata: Dict[str, object]):
"""Sets the metadata of this UploadFileResponse.
:param metadata: The metadata of this UploadFileResponse.
:type metadata: Dict[str, object]
"""
if metadata is None:
raise ValueError("Invalid value for `metadata`, must not be `None`") # noqa: E501

self._metadata = metadata
12 changes: 12 additions & 0 deletions coordinator/gscoordinator/flex/openapi/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,11 @@ paths:
"200":
content:
application/json:
example:
file_path: /home/graphscope/path/to/file.csv
metadata:
datasource: file
file_type: csv
schema:
$ref: '#/components/schemas/UploadFileResponse'
description: successful operation
Expand Down Expand Up @@ -2008,12 +2013,19 @@ components:
UploadFileResponse:
example:
file_path: file_path
metadata:
key: ""
properties:
file_path:
title: file_path
type: string
metadata:
additionalProperties: true
title: metadata
type: object
required:
- file_path
- metadata
title: UploadFileResponse
LongText:
properties:
Expand Down
9 changes: 9 additions & 0 deletions flex/openapi/openapi_coordinator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,13 @@ components:
UploadFileResponse:
required:
- file_path
- metadata
properties:
file_path:
type: string
metadata:
type: object
additionalProperties: true

LongText:
required:
Expand Down Expand Up @@ -2297,5 +2301,10 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/UploadFileResponse'
example:
file_path: /home/graphscope/path/to/file.csv
metadata:
datasource: file
file_type: csv
500:
$ref: "#/components/responses/500"
6 changes: 4 additions & 2 deletions python/graphscope/flex/rest/models/upload_file_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class UploadFileResponse(BaseModel):
UploadFileResponse
""" # noqa: E501
file_path: StrictStr
__properties: ClassVar[List[str]] = ["file_path"]
metadata: Dict[str, Any]
__properties: ClassVar[List[str]] = ["file_path", "metadata"]

model_config = {
"populate_by_name": True,
Expand Down Expand Up @@ -81,7 +82,8 @@ def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
return cls.model_validate(obj)

_obj = cls.model_validate({
"file_path": obj.get("file_path")
"file_path": obj.get("file_path"),
"metadata": obj.get("metadata")
})
return _obj

Expand Down
2 changes: 1 addition & 1 deletion python/graphscope/gsctl/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def get_command_collection(context: Context):
commands = click.CommandCollection(sources=[common, interactive])
else:
if len(sys.argv) < 2 or sys.argv[1] != "use":
info(f"Using GRAPH {context.context}.", fg="green", bold=True)
info(f"Using GRAPH {context.graph_name}.", fg="green", bold=True)
info("Run `gsctl use GLOBAL` to switch back to GLOBAL context.\n")
commands = click.CommandCollection(sources=[common, interactive_graph])
elif is_insight_mode(context.flex):
Expand Down
6 changes: 4 additions & 2 deletions python/graphscope/gsctl/commands/interactive/glob.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from graphscope.gsctl.impl import delete_job_by_id
from graphscope.gsctl.impl import get_datasource_by_id
from graphscope.gsctl.impl import get_graph_id_by_name
from graphscope.gsctl.impl import get_graph_name_by_id
from graphscope.gsctl.impl import get_job_by_id
from graphscope.gsctl.impl import list_graphs
from graphscope.gsctl.impl import list_jobs
Expand Down Expand Up @@ -76,16 +77,17 @@ def use(context, graph_identifier):
"""Switch to GRAPH context, see identifier with `ls` command"""
try:
graph_identifier = get_graph_id_by_name(graph_identifier)
graph_name = get_graph_name_by_id(graph_identifier)
status = list_service_status()
for s in status:
if s.graph_id == graph_identifier and s.status != "Running":
info(f"Starting service on graph {graph_identifier}...")
start_service(graph_identifier)
switch_context(graph_identifier)
switch_context(graph_identifier, graph_name)
except Exception as e:
err(f"Failed to switch context: {str(e)}")
else:
click.secho(f"Using GRAPH {graph_identifier}", fg="green")
click.secho(f"Using GRAPH {graph_name}", fg="green")


@cli.command()
Expand Down
8 changes: 6 additions & 2 deletions python/graphscope/gsctl/commands/interactive/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from graphscope.gsctl.impl import create_stored_procedure
from graphscope.gsctl.impl import delete_stored_procedure_by_id
from graphscope.gsctl.impl import get_datasource_by_id
from graphscope.gsctl.impl import get_graph_name_by_id
from graphscope.gsctl.impl import list_graphs
from graphscope.gsctl.impl import list_service_status
from graphscope.gsctl.impl import list_stored_procedures
Expand Down Expand Up @@ -199,6 +200,7 @@ def status(): # noqa: F811
def _construct_and_display_data(status):
current_context = get_current_context()
graph_identifier = current_context.context
graph_name = current_context.graph_name

head = [
"STATUS",
Expand All @@ -211,12 +213,14 @@ def _construct_and_display_data(status):
for s in status:
if s.graph_id == graph_identifier:
if s.status == "Stopped":
data.append([s.status, s.graph_id, "-", "-", "-"])
data.append(
[s.status, f"{graph_name}({s.graph_id})", "-", "-", "-"]
)
else:
data.append(
[
s.status,
s.graph_id,
f"{graph_name}({s.graph_id})",
s.sdk_endpoints.cypher,
s.sdk_endpoints.hqps,
s.sdk_endpoints.gremlin,
Expand Down
7 changes: 7 additions & 0 deletions python/graphscope/gsctl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(
name=None,
timestamp=time.time(),
context="global",
graph_name=None,
):
if name is None:
name = "context_" + "".join(random.choices(ascii_letters, k=8))
Expand All @@ -55,14 +56,19 @@ def __init__(
self.coordinator_endpoint = coordinator_endpoint
# switch to specific graph after `using graph`
self.context = context
self.graph_name = graph_name
self.timestamp = timestamp

def switch_context(self, context: str):
self.context = context

def set_graph_name(self, graph_name: str):
self.graph_name = graph_name

def to_dict(self) -> dict:
return {
"name": self.name,
"graph_name": str(self.graph_name),
"flex": self.flex,
"coordinator_endpoint": self.coordinator_endpoint,
"context": self.context,
Expand All @@ -77,6 +83,7 @@ def from_dict(cls, dikt):
name=dikt.get("name"),
timestamp=dikt.get("timestamp"),
context=dikt.get("context"),
graph_name=dikt.get("graph_name", None),
)

def is_expired(self, validity_period=86400) -> bool:
Expand Down
1 change: 1 addition & 0 deletions python/graphscope/gsctl/impl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from graphscope.gsctl.impl.graph import create_graph
from graphscope.gsctl.impl.graph import delete_graph_by_id
from graphscope.gsctl.impl.graph import get_graph_id_by_name
from graphscope.gsctl.impl.graph import get_graph_name_by_id
from graphscope.gsctl.impl.graph import list_graphs
from graphscope.gsctl.impl.job import delete_job_by_id
from graphscope.gsctl.impl.job import get_job_by_id
Expand Down
8 changes: 8 additions & 0 deletions python/graphscope/gsctl/impl/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,11 @@ def get_graph_id_by_name(name_or_id: str):
f"Found multiple id candidates {id_candidate} for graph {name_or_id}, please choose one."
)
return id_candidate[0]


def get_graph_name_by_id(graph_identifier: str):
graphs = list_graphs()
for g in graphs:
if g.id == graph_identifier:
return g.name
return graph_identifier
3 changes: 2 additions & 1 deletion python/graphscope/gsctl/impl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ def upload_file(location: str) -> str:
return api_instance.upload_file(location).file_path


def switch_context(context: str):
def switch_context(context: str, graph_name=None):
config = load_gs_config()
current_context = get_current_context()
current_context.switch_context(context)
current_context.set_graph_name(graph_name)
config.update_and_write(current_context)

0 comments on commit 40b3e8c

Please sign in to comment.