Skip to content

Commit

Permalink
Add GremlinServiceAvailable alert rule
Browse files Browse the repository at this point in the history
  • Loading branch information
lidongze0629 committed Mar 6, 2024
1 parent 3173f31 commit 5865940
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 22 deletions.
63 changes: 61 additions & 2 deletions flex/coordinator/gs_flex_coordinator/core/alert/builtin_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import socket

import psutil
from gremlin_python.driver.client import Client
from gs_flex_coordinator.core import client_wrapper
from gs_flex_coordinator.core.alert.alert_rule import AlertRule
from gs_flex_coordinator.core.alert.message_collector import AlertMessageCollector
from gs_flex_coordinator.core.alert.message_collector import \
AlertMessageCollector
from gs_flex_coordinator.core.config import CLUSTER_TYPE, SOLUTION

logger = logging.getLogger("graphscope")
Expand Down Expand Up @@ -74,6 +77,52 @@ def run_alert(self):
logger.warn("Failed to get disk usage: %s", str(e))


class GremlinServiceAvailableAlert(AlertRule):
def __init__(
self,
name,
severity,
metric_type,
conditions_description,
frequency,
message_collector,
enable=True,
):
super().__init__(
name,
severity,
metric_type,
conditions_description,
frequency,
message_collector,
enable,
)

def run_alert(self):
"""This function needs to handle exception by itself"""
try:
graph = client_wrapper.get_current_graph()
gremlin_interface = graph.gremlin_interface
client = Client(
gremlin_interface["gremlin_endpoint"],
"g",
username=gremlin_interface["username"],
password=gremlin_interface["password"],
)
client.submit("g.with('evaluationTimeout', 5000).V().limit(1)").all.result()
except Exception as e:
message = "Gremlin service unavailable: {0}".format(str(e))
# unable to distinguish whether frontend or excutor is unavailable,
# thus, we set the target "-"
alert_message = self.generate_alert_message("-", message)
self.alert(alert_message)
finally:
try:
client.close()
except: # noqa: E722
pass

Check notice on line 123 in flex/coordinator/gs_flex_coordinator/core/alert/builtin_rules.py

View check run for this annotation

codefactor.io / CodeFactor

flex/coordinator/gs_flex_coordinator/core/alert/builtin_rules.py#L122-L123

Try, Except, Pass detected. (B110)


def init_builtin_alert_rules(message_collector: AlertMessageCollector):
alert_rules = {}
# HighDiskUtilization
Expand All @@ -82,9 +131,19 @@ def init_builtin_alert_rules(message_collector: AlertMessageCollector):
severity="warning",
metric_type="node",
conditions_description="disk_utilization>80",
frequency=1,
frequency=180,
message_collector=message_collector,
threshold=80,
enable=True,
)
if SOLUTION == "GRAPHSCOPE_INSIGHT":
alert_rules["GremlinServiceAvailable"] = GremlinServiceAvailableAlert(
name="GremlinServiceAvailable",
severity="emergency",
metric_type="service",
conditions_description="g.V().limit(1) failed",
frequency=5,
message_collector=message_collector,
enable=True,
)
return alert_rules
18 changes: 11 additions & 7 deletions flex/coordinator/gs_flex_coordinator/core/client_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from typing import List, Union

import psutil

from gs_flex_coordinator.core.config import (CLUSTER_TYPE, CREATION_TIME,
DATASET_WORKSPACE, INSTANCE_NAME,
SOLUTION, WORKSPACE)
Expand All @@ -37,11 +36,11 @@
encode_datetime, get_current_time)
from gs_flex_coordinator.models import (DataSource, DeploymentInfo,
EdgeDataSource, EdgeType, Graph,
GrootGraph, GrootSchema, JobStatus,
ModelSchema, NodeStatus, Procedure,
SchemaMapping, ServiceStatus,
StartServiceRequest, VertexDataSource,
VertexType, GrootDataloadingJobConfig)
GrootDataloadingJobConfig, GrootGraph,
GrootSchema, JobStatus, ModelSchema,
NodeStatus, Procedure, SchemaMapping,
ServiceStatus, StartServiceRequest,
VertexDataSource, VertexType)
from gs_flex_coordinator.version import __version__

logger = logging.getLogger("graphscope")
Expand Down Expand Up @@ -126,6 +125,9 @@ def get_groot_schema(self, graph_name: str) -> GrootSchema:
def import_groot_schema(self, graph_name: str, schema: GrootSchema) -> str:
return self._client.import_groot_schema(graph_name, schema.to_dict())

def get_current_graph(self) -> GrootGraph:
return self._client.get_current_graph()

def create_graph(self, graph: Graph) -> str:
# there are some tricks here, since schema is a keyword of openapi
# specification, so it will be converted into the _schema field.
Expand Down Expand Up @@ -282,7 +284,9 @@ def upload_file(self, filestorage) -> str:
def create_groot_dataloading_job(
self, graph_name: str, job_config: GrootDataloadingJobConfig
) -> str:
job_id = self._client.create_groot_dataloading_job(graph_name, job_config.to_dict())
job_id = self._client.create_groot_dataloading_job(
graph_name, job_config.to_dict()
)
return job_id

def list_groot_graph(self) -> List[GrootGraph]:
Expand Down
11 changes: 2 additions & 9 deletions flex/coordinator/gs_flex_coordinator/core/insight/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
from graphscope.deploy.kubernetes.utils import (get_service_endpoints,
resolve_api_client)
from gremlin_python.driver.client import Client
from kubernetes import client as kube_client
from kubernetes import config as kube_config

from gs_flex_coordinator.core.config import (CLUSTER_TYPE, CREATION_TIME,
ENABLE_DNS, GROOT_GREMLIN_PORT,
GROOT_GRPC_PORT, GROOT_PASSWORD,
Expand All @@ -39,6 +36,8 @@
encode_datetime, get_internal_ip,
get_public_ip)
from gs_flex_coordinator.version import __version__
from kubernetes import client as kube_client
from kubernetes import config as kube_config

logger = logging.getLogger("graphscope")

Expand Down Expand Up @@ -283,12 +282,6 @@ def get_groot_graph_from_local():
client.submit(
"g.with('evaluationTimeout', 5000).V().limit(1)"
).all().result()
print(
"DEBUG: ",
client.submit("g.with('evaluationTimeout', 5000).V().valueMap().limit(10)")
.all()
.result(),
)
except Exception as e:
pass

Check notice on line 286 in flex/coordinator/gs_flex_coordinator/core/insight/graph.py

View check run for this annotation

codefactor.io / CodeFactor

flex/coordinator/gs_flex_coordinator/core/insight/graph.py#L285-L286

Try, Except, Pass detected. (B110)
else:
Expand Down
3 changes: 3 additions & 0 deletions flex/coordinator/gs_flex_coordinator/core/insight/groot.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ def get_edge_full_label(
) -> str:
return f"{source_vertex_type}_{type_name}_{destination_vertex_type}"

def get_current_graph(self):
return self._graph

def list_groot_graph(self) -> list:
rlts = [self._graph.to_dict()]
return rlts
Expand Down
Loading

0 comments on commit 5865940

Please sign in to comment.