From 5865940cd8d47d104307713acb1a32e457e5fcde Mon Sep 17 00:00:00 2001 From: Dongze Li Date: Wed, 6 Mar 2024 16:59:03 +0800 Subject: [PATCH] Add GremlinServiceAvailable alert rule --- .../core/alert/builtin_rules.py | 63 ++++- .../core/client_wrapper.py | 18 +- .../gs_flex_coordinator/core/insight/graph.py | 11 +- .../gs_flex_coordinator/core/insight/groot.py | 3 + python/graphscope/gsctl/commands/insight.py | 263 +++++++++++++++++- 5 files changed, 336 insertions(+), 22 deletions(-) diff --git a/flex/coordinator/gs_flex_coordinator/core/alert/builtin_rules.py b/flex/coordinator/gs_flex_coordinator/core/alert/builtin_rules.py index 27c0160df19d..ad69b4af8610 100644 --- a/flex/coordinator/gs_flex_coordinator/core/alert/builtin_rules.py +++ b/flex/coordinator/gs_flex_coordinator/core/alert/builtin_rules.py @@ -21,8 +21,11 @@ import socket import psutil +from gremlin_python.driver.client import Client +from gs_flex_coordinator.core import client_wrapper from gs_flex_coordinator.core.alert.alert_rule import AlertRule -from gs_flex_coordinator.core.alert.message_collector import AlertMessageCollector +from gs_flex_coordinator.core.alert.message_collector import \ + AlertMessageCollector from gs_flex_coordinator.core.config import CLUSTER_TYPE, SOLUTION logger = logging.getLogger("graphscope") @@ -74,6 +77,52 @@ def run_alert(self): logger.warn("Failed to get disk usage: %s", str(e)) +class GremlinServiceAvailableAlert(AlertRule): + def __init__( + self, + name, + severity, + metric_type, + conditions_description, + frequency, + message_collector, + enable=True, + ): + super().__init__( + name, + severity, + metric_type, + conditions_description, + frequency, + message_collector, + enable, + ) + + def run_alert(self): + """This function needs to handle exception by itself""" + try: + graph = client_wrapper.get_current_graph() + gremlin_interface = graph.gremlin_interface + client = Client( + gremlin_interface["gremlin_endpoint"], + "g", + username=gremlin_interface["username"], + password=gremlin_interface["password"], + ) + client.submit("g.with('evaluationTimeout', 5000).V().limit(1)").all.result() + except Exception as e: + message = "Gremlin service unavailable: {0}".format(str(e)) + # unable to distinguish whether frontend or excutor is unavailable, + # thus, we set the target "-" + alert_message = self.generate_alert_message("-", message) + self.alert(alert_message) + finally: + try: + client.close() + except: # noqa: E722 + pass + + def init_builtin_alert_rules(message_collector: AlertMessageCollector): alert_rules = {} # HighDiskUtilization @@ -82,9 +131,19 @@ def init_builtin_alert_rules(message_collector: AlertMessageCollector): severity="warning", metric_type="node", conditions_description="disk_utilization>80", - frequency=1, + frequency=180, message_collector=message_collector, threshold=80, enable=True, ) + if SOLUTION == "GRAPHSCOPE_INSIGHT": + alert_rules["GremlinServiceAvailable"] = GremlinServiceAvailableAlert( + name="GremlinServiceAvailable", + severity="emergency", + metric_type="service", + conditions_description="g.V().limit(1) failed", + frequency=5, + message_collector=message_collector, + enable=True, + ) return alert_rules diff --git a/flex/coordinator/gs_flex_coordinator/core/client_wrapper.py b/flex/coordinator/gs_flex_coordinator/core/client_wrapper.py index 7d2cddac7710..de089034f97e 100644 --- a/flex/coordinator/gs_flex_coordinator/core/client_wrapper.py +++ b/flex/coordinator/gs_flex_coordinator/core/client_wrapper.py @@ -26,7 +26,6 @@ from typing import List, Union import psutil - from gs_flex_coordinator.core.config import (CLUSTER_TYPE, CREATION_TIME, DATASET_WORKSPACE, INSTANCE_NAME, SOLUTION, WORKSPACE) @@ -37,11 +36,11 @@ encode_datetime, get_current_time) from gs_flex_coordinator.models import (DataSource, DeploymentInfo, EdgeDataSource, EdgeType, Graph, - GrootGraph, GrootSchema, JobStatus, - ModelSchema, NodeStatus, Procedure, - SchemaMapping, ServiceStatus, - StartServiceRequest, VertexDataSource, - VertexType, GrootDataloadingJobConfig) + GrootDataloadingJobConfig, GrootGraph, + GrootSchema, JobStatus, ModelSchema, + NodeStatus, Procedure, SchemaMapping, + ServiceStatus, StartServiceRequest, + VertexDataSource, VertexType) from gs_flex_coordinator.version import __version__ logger = logging.getLogger("graphscope") @@ -126,6 +125,9 @@ def get_groot_schema(self, graph_name: str) -> GrootSchema: def import_groot_schema(self, graph_name: str, schema: GrootSchema) -> str: return self._client.import_groot_schema(graph_name, schema.to_dict()) + def get_current_graph(self) -> GrootGraph: + return self._client.get_current_graph() + def create_graph(self, graph: Graph) -> str: # there are some tricks here, since schema is a keyword of openapi # specification, so it will be converted into the _schema field. @@ -282,7 +284,9 @@ def upload_file(self, filestorage) -> str: def create_groot_dataloading_job( self, graph_name: str, job_config: GrootDataloadingJobConfig ) -> str: - job_id = self._client.create_groot_dataloading_job(graph_name, job_config.to_dict()) + job_id = self._client.create_groot_dataloading_job( + graph_name, job_config.to_dict() + ) return job_id def list_groot_graph(self) -> List[GrootGraph]: diff --git a/flex/coordinator/gs_flex_coordinator/core/insight/graph.py b/flex/coordinator/gs_flex_coordinator/core/insight/graph.py index 92b62c7bdc90..e460fe95d33c 100644 --- a/flex/coordinator/gs_flex_coordinator/core/insight/graph.py +++ b/flex/coordinator/gs_flex_coordinator/core/insight/graph.py @@ -26,9 +26,6 @@ from graphscope.deploy.kubernetes.utils import (get_service_endpoints, resolve_api_client) from gremlin_python.driver.client import Client -from kubernetes import client as kube_client -from kubernetes import config as kube_config - from gs_flex_coordinator.core.config import (CLUSTER_TYPE, CREATION_TIME, ENABLE_DNS, GROOT_GREMLIN_PORT, GROOT_GRPC_PORT, GROOT_PASSWORD, @@ -39,6 +36,8 @@ encode_datetime, get_internal_ip, get_public_ip) from gs_flex_coordinator.version import __version__ +from kubernetes import client as kube_client +from kubernetes import config as kube_config logger = logging.getLogger("graphscope") @@ -283,12 +282,6 @@ def get_groot_graph_from_local(): client.submit( "g.with('evaluationTimeout', 5000).V().limit(1)" ).all().result() - print( - "DEBUG: ", - client.submit("g.with('evaluationTimeout', 5000).V().valueMap().limit(10)") - .all() - .result(), - ) except Exception as e: pass else: diff --git a/flex/coordinator/gs_flex_coordinator/core/insight/groot.py b/flex/coordinator/gs_flex_coordinator/core/insight/groot.py index e96c11dff38a..ae698565da22 100644 --- a/flex/coordinator/gs_flex_coordinator/core/insight/groot.py +++ b/flex/coordinator/gs_flex_coordinator/core/insight/groot.py @@ -106,6 +106,9 @@ def get_edge_full_label( ) -> str: return f"{source_vertex_type}_{type_name}_{destination_vertex_type}" + def get_current_graph(self): + return self._graph + def list_groot_graph(self) -> list: rlts = [self._graph.to_dict()] return rlts diff --git a/python/graphscope/gsctl/commands/insight.py b/python/graphscope/gsctl/commands/insight.py index ae6d5f69c3bf..14da3279cfa0 100644 --- a/python/graphscope/gsctl/commands/insight.py +++ b/python/graphscope/gsctl/commands/insight.py @@ -20,14 +20,20 @@ import yaml from graphscope.gsctl.impl import (create_edge_type, create_groot_dataloading_job, - create_vertex_type, delete_edge_type, + create_vertex_type, + delete_alert_receiver_by_id, + delete_alert_rule_by_name, delete_edge_type, delete_job_by_id, delete_vertex_type, get_datasource, get_deployment_info, get_job_by_id, get_node_status, import_datasource, import_groot_schema, - list_groot_graph, list_jobs, - unbind_edge_datasource, - unbind_vertex_datasource) + list_alert_messages, list_alert_receivers, + list_alert_rules, list_groot_graph, + list_jobs, unbind_edge_datasource, + unbind_vertex_datasource, + update_alert_messages, + update_alert_receiver_by_id, + update_alert_rule) from graphscope.gsctl.utils import (is_valid_file_path, read_yaml_file, terminal_display) @@ -49,6 +55,12 @@ def delete(): pass +@cli.group() +def update(): + """Update a resource from a file""" + pass + + @cli.group() def describe(): """Show details of a specific resource or group of resources""" @@ -484,5 +496,248 @@ def _construct_and_display_data(nodes): _construct_and_display_data(nodes) +@get.command() +def alertrule(): + """Display alert rules in database""" + + def _construct_and_display_data(rules): + if not rules: + click.secho("no alert rules found in database.", fg="blue") + return + head = [ + "NAME", + "SEVERITY", + "METRIC_TYPE", + "CONDITIONS_DESCRIPTION", + "FREQUENCY", + "ENABLE", + ] + data = [head] + for r in rules: + data.append( + [ + r.name, + r.severity, + r.metric_type, + r.conditions_description, + "{0} Min".format(r.frequency), + str(r.enable), + ] + ) + terminal_display(data) + + try: + rules = list_alert_rules() + except Exception as e: + click.secho(f"Failed to list alert rules: {str(e)}", fg="red") + else: + _construct_and_display_data(rules) + + +@update.command() +@click.option( + "-f", + "--filename", + required=True, + help="Path of yaml file to use to update an alertrule", +) +def alertrule(filename): # noqa: F811 + """Update an alert rule in database""" + if not is_valid_file_path(filename): + click.secho("Invalid file: {0}".format(filename), fg="blue") + return + try: + rule = read_yaml_file(filename) + update_alert_rule(rule) + except Exception as e: + click.secho(f"Failed to update alert rule: {str(e)}", fg="red") + else: + click.secho(f"Update alert rule {rule['name']} successfully.", fg="green") + + +@delete.command() +@click.argument("RULE_NAME", required=True) +def alertrule(rule_name): # noqa: F811 + """Delete an alert rule indatabase""" + try: + delete_alert_rule_by_name(rule_name) + except Exception as e: + click.secho(f"Failed to delete alert rule: {str(e)}", fg="red") + else: + click.secho( + f"Delete alert rule {rule_name} successfully.", + fg="green", + ) + + +@get.command() +@click.option( + "--status", + type=click.Choice(["unsolved", "dealing", "solved"]), + required=False, +) +@click.option( + "--severity", + type=click.Choice(["emergency", "warning"]), + required=False, +) +@click.option("--starttime", required=False, help="format with 2024-01-01-00-00-00") +@click.option("--endtime", required=False, help="format with 2024-01-02-12-30-00") +@click.option("--limit", required=False, default=100) +def alertmessage(status, severity, starttime, endtime, limit): + """Display alert messages in database""" + + def _construct_and_display_data(messages): + if not messages: + click.secho("no alert message found in database.", fg="blue") + return + head = [ + "MESSAGE_ID", + "SEVERITY", + "METRIC_TYPE", + "TARGET", + "TRIGGER_TIME", + "STATUS", + "MESSAGE", + ] + data = [head] + for m in messages[:limit]: + data.append( + [ + m.message_id, + m.severity, + m.metric_type, + ",".join(m.target), + m.trigger_time, + m.status, + m.message, + ] + ) + terminal_display(data) + + try: + messages = list_alert_messages(status, severity, starttime, endtime) + except Exception as e: + click.secho(f"Failed to list alert messages: {str(e)}", fg="red") + else: + _construct_and_display_data(messages) + + +@update.command() +@click.option( + "-f", + "--filename", + required=True, + help="Path of yaml file to use to update alert messages in batch", +) +def alertmessage(filename): # noqa: F811 + """Update alert messages in batch""" + if not is_valid_file_path(filename): + click.secho("Invalid file: {0}".format(filename), fg="blue") + return + try: + request = read_yaml_file(filename) + update_alert_messages(request) + except Exception as e: + click.secho(f"Failed to update alert messages: {str(e)}", fg="red") + else: + click.secho("Update alert messages successfully.", fg="green") + + +@get.command() +def alertreceiver(): + """Display alert receivers in database""" + + def _construct_and_display_data(receivers): + if not receivers: + click.secho("no alert receiver found in database.", fg="blue") + return + head = [ + "RECEIVER_ID", + "TYPE", + "WEBHOOK_URL", + "AT_USERS_ID", + "IS_AT_ALL", + "ENABLE", + "MESSAGE", + ] + data = [head] + for r in receivers: + data.append( + [ + r.receiver_id, + r.type, + r.webhook_url, + ",".join(r.at_user_ids), + str(r.is_at_all), + str(r.enable), + r.message, + ] + ) + terminal_display(data) + + try: + receivers = list_alert_receivers() + except Exception as e: + click.secho(f"Failed to list alert receivers: {str(e)}", fg="red") + else: + _construct_and_display_data(receivers) + + +@create.command() +@click.option( + "-f", + "--filename", + required=True, + help="Path of yaml file to use to create an alert receiver", +) +def alertreceiver(filename): # noqa: F811 + """Create an alert receiver in database""" + if not is_valid_file_path(filename): + click.secho("Invalid file: {0}".format(filename), fg="blue") + return + try: + receiver = read_yaml_file(filename) + register_receiver(receiver) + except Exception as e: + click.secho(f"Failed to create alert receiver: {str(e)}", fg="red") + else: + click.secho("Create alert receiver successfully.", fg="green") + + +@update.command() +@click.argument("receiver_id", required=True) +@click.option( + "-f", + "--filename", + required=True, + help="Path of yaml file to use to update an alert receiver", +) +def alertreceiver(receiver_id, filename): # noqa: F811 + """Update an alert receiver by id in database""" + if not is_valid_file_path(filename): + click.secho("Invalid file: {0}".format(filename), fg="blue") + return + try: + receiver = read_yaml_file(filename) + update_alert_receiver_by_id(receiver_id, receiver) + except Exception as e: + click.secho(f"Failed to update the alert receiver: {str(e)}", fg="red") + else: + click.secho(f"Update alert receiver {receiver_id} successfully.", fg="green") + + +@delete.command() +@click.argument("receiver_id", required=True) +def alertreceiver(receiver_id): # noqa: F811 + """Delete an alert receiver by id in database""" + try: + delete_alert_receiver_by_id(receiver_id) + except Exception as e: + click.secho(f"Failed to delete alert receiver: {str(e)}", fg="red") + else: + click.secho(f"Delete alert receiver {receiver_id} successfully.", fg="green") + + if __name__ == "__main__": cli()