Skip to content

Commit

Permalink
Impl file upload interface
Browse files Browse the repository at this point in the history
  • Loading branch information
lidongze0629 committed Feb 19, 2024
1 parent 553224a commit 10d3faf
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
from gs_flex_coordinator.core import handle_api_exception
from gs_flex_coordinator import util

def upload_file(body): # noqa: E501

@handle_api_exception()
def upload_file(filestorage=None): # noqa: E501
"""upload_file
# noqa: E501
:param body:
:type body: str
:param filestorage:
:type filestorage: str
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.upload_file(body)
return client_wrapper.upload_file(filestorage)
42 changes: 15 additions & 27 deletions flex/coordinator/gs_flex_coordinator/core/client_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,18 @@
from typing import List, Union

import psutil

from gs_flex_coordinator.core.config import (
CLUSTER_TYPE,
COORDINATOR_STARTING_TIME,
INSTANCE_NAME,
SOLUTION,
WORKSPACE,
)
from gs_flex_coordinator.core.config import (CLUSTER_TYPE,
COORDINATOR_STARTING_TIME,
DATASET_WORKSPACE, INSTANCE_NAME,
SOLUTION, WORKSPACE)
from gs_flex_coordinator.core.interactive import init_hqps_client
from gs_flex_coordinator.core.scheduler import schedule
from gs_flex_coordinator.core.utils import (
GraphInfo,
decode_datetimestr,
encode_datetime,
get_current_time,
)
from gs_flex_coordinator.models import (
DeploymentInfo,
Graph,
JobStatus,
ModelSchema,
NodeStatus,
Procedure,
SchemaMapping,
ServiceStatus,
StartServiceRequest,
)
from gs_flex_coordinator.core.utils import (GraphInfo, decode_datetimestr,
encode_datetime, get_current_time)
from gs_flex_coordinator.models import (DeploymentInfo, Graph, JobStatus,
ModelSchema, NodeStatus, Procedure,
SchemaMapping, ServiceStatus,
StartServiceRequest)
from gs_flex_coordinator.version import __version__

logger = logging.getLogger("graphscope")
Expand Down Expand Up @@ -231,8 +216,11 @@ def create_dataloading_job(
job_id = self._client.create_dataloading_job(graph_name, schema_mapping_dict)
return job_id

def upload_file(self, body: bytes) -> str:
return "xxx"
def upload_file(self, filestorage) -> str:
if CLUSTER_TYPE == "HOSTS":
filepath = os.path.join(DATASET_WORKSPACE, filestorage.filename)
filestorage.save(filepath)
return str(filepath)


client_wrapper = ClientWrapper()
5 changes: 5 additions & 0 deletions flex/coordinator/gs_flex_coordinator/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ def config_logging(log_level: str):
os.makedirs(ALERT_WORKSPACE, exist_ok=True)


# dataset workspace
DATASET_WORKSPACE = os.path.join(WORKSPACE, "dataset")
os.makedirs(DATASET_WORKSPACE, exist_ok=True)


# we use the solution encompasses the various applications and use cases of the
# product across different industries and business scenarios, e.g. "INTERACTIVE",
# "GRAPHSCOPE INSIGHT".
Expand Down
11 changes: 8 additions & 3 deletions flex/coordinator/gs_flex_coordinator/openapi/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,9 @@ paths:
operationId: upload_file
requestBody:
content:
application/octet-stream:
multipart/form-data:
schema:
format: binary
type: string
$ref: '#/components/schemas/upload_file_request'
required: true
responses:
"200":
Expand Down Expand Up @@ -2126,6 +2125,12 @@ components:
type: boolean
title: update_alert_messages_request
type: object
upload_file_request:
properties:
filestorage:
format: binary
type: string
type: object
Graph_stored_procedures:
example:
directory: plugins
Expand Down
10 changes: 6 additions & 4 deletions flex/openapi/openapi_coordinator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -831,11 +831,13 @@ paths:
requestBody:
required: true
content:
application/octet-stream:
multipart/form-data:
schema:
# a binary file of any type
type: string
format: binary
type: object
properties:
filestorage:
type: string
format: binary
responses:
'200':
description: successful operation
Expand Down
39 changes: 17 additions & 22 deletions python/graphscope/flex/rest/api/utils_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from typing_extensions import Annotated

from pydantic import StrictBytes, StrictStr
from typing import Union
from typing import Optional, Union

from graphscope.flex.rest.api_client import ApiClient, RequestSerialized
from graphscope.flex.rest.api_response import ApiResponse
Expand All @@ -41,7 +41,7 @@ def __init__(self, api_client=None) -> None:
@validate_call
def upload_file(
self,
body: Union[StrictBytes, StrictStr],
filestorage: Optional[Union[StrictBytes, StrictStr]] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Expand All @@ -58,8 +58,8 @@ def upload_file(
"""upload_file
:param body: (required)
:type body: bytearray
:param filestorage:
:type filestorage: bytearray
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
Expand All @@ -83,7 +83,7 @@ def upload_file(
""" # noqa: E501

_param = self._upload_file_serialize(
body=body,
filestorage=filestorage,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
Expand All @@ -107,7 +107,7 @@ def upload_file(
@validate_call
def upload_file_with_http_info(
self,
body: Union[StrictBytes, StrictStr],
filestorage: Optional[Union[StrictBytes, StrictStr]] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Expand All @@ -124,8 +124,8 @@ def upload_file_with_http_info(
"""upload_file
:param body: (required)
:type body: bytearray
:param filestorage:
:type filestorage: bytearray
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
Expand All @@ -149,7 +149,7 @@ def upload_file_with_http_info(
""" # noqa: E501

_param = self._upload_file_serialize(
body=body,
filestorage=filestorage,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
Expand All @@ -173,7 +173,7 @@ def upload_file_with_http_info(
@validate_call
def upload_file_without_preload_content(
self,
body: Union[StrictBytes, StrictStr],
filestorage: Optional[Union[StrictBytes, StrictStr]] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Expand All @@ -190,8 +190,8 @@ def upload_file_without_preload_content(
"""upload_file
:param body: (required)
:type body: bytearray
:param filestorage:
:type filestorage: bytearray
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
Expand All @@ -215,7 +215,7 @@ def upload_file_without_preload_content(
""" # noqa: E501

_param = self._upload_file_serialize(
body=body,
filestorage=filestorage,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
Expand All @@ -234,7 +234,7 @@ def upload_file_without_preload_content(

def _upload_file_serialize(
self,
body,
filestorage,
_request_auth,
_content_type,
_headers,
Expand All @@ -257,14 +257,9 @@ def _upload_file_serialize(
# process the query parameters
# process the header parameters
# process the form parameters
if filestorage is not None:
_files['filestorage'] = filestorage
# process the body parameter
if body is not None:
# convert to byte array if the input is a file name (str)
if isinstance(body, str):
with open(body, "rb") as _fp:
_body_params = _fp.read()
else:
_body_params = body


# set the HTTP header `Accept`
Expand All @@ -281,7 +276,7 @@ def _upload_file_serialize(
_default_content_type = (
self.api_client.select_header_content_type(
[
'application/octet-stream'
'multipart/form-data'
]
)
)
Expand Down
3 changes: 1 addition & 2 deletions python/graphscope/gsctl/impl/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def create_dataloading_job(graph_name: str, job_config: dict) -> str:
content = f.read()
path_after_uploaded = upload_file(filename, content, location)
mapping["inputs"][index] = path_after_uploaded
print('!!!!!!!!: ', job_config)
# create job
context = get_current_context()
with graphscope.flex.rest.ApiClient(
Expand All @@ -87,4 +86,4 @@ def upload_file(filename: str, content: bytes, location: str) -> str:
graphscope.flex.rest.Configuration(context.coordinator_endpoint)
) as api_client:
api_instance = graphscope.flex.rest.UtilsApi(api_client)
return api_instance.upload_file(content)
return api_instance.upload_file(location)

0 comments on commit 10d3faf

Please sign in to comment.