Skip to content

Commit

Permalink
Add graphs info in deployment interface
Browse files Browse the repository at this point in the history
  • Loading branch information
lidongze0629 committed Feb 19, 2024
1 parent 6443546 commit 553224a
Show file tree
Hide file tree
Showing 13 changed files with 469 additions and 15 deletions.
1 change: 1 addition & 0 deletions flex/coordinator/.openapi-generator/FILES
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gs_flex_coordinator/models/column_mapping.py
gs_flex_coordinator/models/connection.py
gs_flex_coordinator/models/connection_status.py
gs_flex_coordinator/models/deployment_info.py
gs_flex_coordinator/models/deployment_info_graphs_info_value.py
gs_flex_coordinator/models/deployment_status.py
gs_flex_coordinator/models/edge_mapping.py
gs_flex_coordinator/models/edge_mapping_destination_vertex_mappings_inner.py
Expand Down
90 changes: 81 additions & 9 deletions flex/coordinator/gs_flex_coordinator/core/client_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,40 @@
import datetime
import itertools
import logging
import os
import pickle
import socket
import threading
from typing import List, Union

import psutil
from gs_flex_coordinator.core.config import (CLUSTER_TYPE, INSTANCE_NAME,
SOLUTION)

from gs_flex_coordinator.core.config import (
CLUSTER_TYPE,
COORDINATOR_STARTING_TIME,
INSTANCE_NAME,
SOLUTION,
WORKSPACE,
)
from gs_flex_coordinator.core.interactive import init_hqps_client
from gs_flex_coordinator.core.utils import encode_datetime
from gs_flex_coordinator.models import (DeploymentInfo, Graph, JobStatus,
ModelSchema, NodeStatus, Procedure,
SchemaMapping, ServiceStatus,
StartServiceRequest)
from gs_flex_coordinator.core.scheduler import schedule
from gs_flex_coordinator.core.utils import (
GraphInfo,
decode_datetimestr,
encode_datetime,
get_current_time,
)
from gs_flex_coordinator.models import (
DeploymentInfo,
Graph,
JobStatus,
ModelSchema,
NodeStatus,
Procedure,
SchemaMapping,
ServiceStatus,
StartServiceRequest,
)
from gs_flex_coordinator.version import __version__

logger = logging.getLogger("graphscope")
Expand All @@ -45,6 +66,34 @@ def __init__(self):
self._lock = threading.RLock()
# initialize specific client
self._client = self._initialize_client()
# graphs info
self._graphs_info = {}
# pickle path
self._pickle_path = os.path.join(WORKSPACE, "graphs_info.pickle")
# recover
self._try_to_recover_from_disk()

def _try_to_recover_from_disk(self):
try:
if os.path.exists(self._pickle_path):
logger.info("Recover graphs info from file %s", self._pickle_path)
with open(self._pickle_path, "rb") as f:
self._graphs_info = pickle.load(f)
except Exception as e:
logger.warn("Failed to recover graphs info: %s", str(e))
# set default graph info
for g in self.list_graphs():
if g.name not in self._graphs_info:
self._graphs_info[g.name] = GraphInfo(
name=g.name, creation_time=COORDINATOR_STARTING_TIME
)

def _pickle_graphs_info_impl(self):
try:
with open(self._pickle_path, "wb") as f:
pickle.dump(self._graphs_info, f)
except Exception as e:
logger.warn("Failed to dump graphs info: %s", str(e))

def _initialize_client(self):
service_initializer = {"INTERACTIVE": init_hqps_client}
Expand Down Expand Up @@ -73,10 +122,18 @@ def create_graph(self, graph: Graph) -> str:
graph_dict = graph.to_dict()
if "_schema" in graph_dict:
graph_dict["schema"] = graph_dict.pop("_schema")
return self._client.create_graph(graph_dict)
rlt = self._client.create_graph(graph_dict)
self._graphs_info[graph.name] = GraphInfo(
name=graph.name, creation_time=get_current_time()
)
self._pickle_graphs_info_impl()
return rlt

def delete_graph_by_name(self, graph_name: str) -> str:
return self._client.delete_graph_by_name(graph_name)
rlt = self._client.delete_graph_by_name(graph_name)
del self._graphs_info[graph_name]
self._pickle_graphs_info_impl()
return rlt

def create_procedure(self, graph_name: str, procedure: Procedure) -> str:
procedure_dict = procedure.to_dict()
Expand Down Expand Up @@ -111,10 +168,25 @@ def get_node_status(self) -> List[NodeStatus]:
return rlt

def get_deployment_info(self) -> DeploymentInfo:
# update graphs info
for job in self.list_jobs():
if (
job.detail["graph_name"] in self._graphs_info
and job.end_time is not None
):
self._graphs_info[job.detail["graph_name"]].last_dataloading_time = (
decode_datetimestr(job.end_time)
)
self._pickle_graphs_info_impl()
graphs_info = {}
for name, info in self._graphs_info.items():
graphs_info[name] = info.to_dict()
info = {
"name": INSTANCE_NAME,
"cluster_type": CLUSTER_TYPE,
"version": __version__,
"solution": SOLUTION,
"graphs_info": graphs_info,
}
return DeploymentInfo.from_dict(info)

Expand Down
5 changes: 5 additions & 0 deletions flex/coordinator/gs_flex_coordinator/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# limitations under the License.
#

import datetime
import logging
import os
import tempfile
Expand Down Expand Up @@ -79,3 +80,7 @@ def config_logging(log_level: str):

# interactive configuration
HQPS_ADMIN_SERVICE_PORT = os.environ.get("HIACTOR_ADMIN_SERVICE_PORT", 7777)


# coordinator starting time
COORDINATOR_STARTING_TIME = datetime.datetime.now()
54 changes: 53 additions & 1 deletion flex/coordinator/gs_flex_coordinator/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import functools
import logging
import random
import requests
import socket
import string
from typing import Union

import requests

logger = logging.getLogger("graphscope")


Expand Down Expand Up @@ -74,6 +75,10 @@ def random_string(nlen):
return "".join([random.choice(string.ascii_lowercase) for _ in range(nlen)])


def get_current_time() -> datetime.datetime:
return datetime.datetime.now()


def str_to_bool(s):
if isinstance(s, bool):
return s
Expand All @@ -97,3 +102,50 @@ def get_public_ip() -> Union[str, None]:
except requests.exceptions.RequestException as e:
logger.warn("Failed to get public ip: %s", str(e))
return None


class GraphInfo(object):
def __init__(
self, name, creation_time, update_time=None, last_dataloading_time=None
):
self._name = name
self._creation_time = creation_time
self._update_time = update_time
if self._update_time is None:
self._update_time = self._creation_time
self._last_dataloading_time = last_dataloading_time

@property
def name(self):
return self._name

@property
def creation_time(self):
return self._creation_time

@property
def update_time(self):
return self._update_time

@property
def last_dataloading_time(self):
return self._last_dataloading_time

@update_time.setter
def update_time(self, new_time):
self._update_time = new_time

@last_dataloading_time.setter
def last_dataloading_time(self, new_time):
if self._last_dataloading_time is None:
self._last_dataloading_time = new_time
elif new_time > self._last_dataloading_time:
self._last_dataloading_time = new_time

def to_dict(self):
return {
"name": self._name,
"creation_time": encode_datetime(self._creation_time),
"update_time": encode_datetime(self._update_time),
"last_dataloading_time": encode_datetime(self._last_dataloading_time),
}
1 change: 1 addition & 0 deletions flex/coordinator/gs_flex_coordinator/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from gs_flex_coordinator.models.connection import Connection
from gs_flex_coordinator.models.connection_status import ConnectionStatus
from gs_flex_coordinator.models.deployment_info import DeploymentInfo
from gs_flex_coordinator.models.deployment_info_graphs_info_value import DeploymentInfoGraphsInfoValue
from gs_flex_coordinator.models.deployment_status import DeploymentStatus
from gs_flex_coordinator.models.edge_mapping import EdgeMapping
from gs_flex_coordinator.models.edge_mapping_destination_vertex_mappings_inner import EdgeMappingDestinationVertexMappingsInner
Expand Down
34 changes: 31 additions & 3 deletions flex/coordinator/gs_flex_coordinator/models/deployment_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
from typing import List, Dict # noqa: F401

from gs_flex_coordinator.models.base_model import Model
from gs_flex_coordinator.models.deployment_info_graphs_info_value import DeploymentInfoGraphsInfoValue
from gs_flex_coordinator import util

from gs_flex_coordinator.models.deployment_info_graphs_info_value import DeploymentInfoGraphsInfoValue # noqa: E501

class DeploymentInfo(Model):
"""NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
Do not edit the class manually.
"""

def __init__(self, name=None, cluster_type=None, version=None, solution=None): # noqa: E501
def __init__(self, name=None, cluster_type=None, version=None, solution=None, graphs_info=None): # noqa: E501
"""DeploymentInfo - a model defined in OpenAPI
:param name: The name of this DeploymentInfo. # noqa: E501
Expand All @@ -23,25 +25,30 @@ def __init__(self, name=None, cluster_type=None, version=None, solution=None):
:type version: str
:param solution: The solution of this DeploymentInfo. # noqa: E501
:type solution: str
:param graphs_info: The graphs_info of this DeploymentInfo. # noqa: E501
:type graphs_info: Dict[str, DeploymentInfoGraphsInfoValue]
"""
self.openapi_types = {
'name': str,
'cluster_type': str,
'version': str,
'solution': str
'solution': str,
'graphs_info': Dict[str, DeploymentInfoGraphsInfoValue]
}

self.attribute_map = {
'name': 'name',
'cluster_type': 'cluster_type',
'version': 'version',
'solution': 'solution'
'solution': 'solution',
'graphs_info': 'graphs_info'
}

self._name = name
self._cluster_type = cluster_type
self._version = version
self._solution = solution
self._graphs_info = graphs_info

@classmethod
def from_dict(cls, dikt) -> 'DeploymentInfo':
Expand Down Expand Up @@ -149,3 +156,24 @@ def solution(self, solution: str):
)

self._solution = solution

@property
def graphs_info(self) -> Dict[str, DeploymentInfoGraphsInfoValue]:
"""Gets the graphs_info of this DeploymentInfo.
:return: The graphs_info of this DeploymentInfo.
:rtype: Dict[str, DeploymentInfoGraphsInfoValue]
"""
return self._graphs_info

@graphs_info.setter
def graphs_info(self, graphs_info: Dict[str, DeploymentInfoGraphsInfoValue]):
"""Sets the graphs_info of this DeploymentInfo.
:param graphs_info: The graphs_info of this DeploymentInfo.
:type graphs_info: Dict[str, DeploymentInfoGraphsInfoValue]
"""

self._graphs_info = graphs_info
Loading

0 comments on commit 553224a

Please sign in to comment.