Skip to content

Commit

Permalink
fix monitor info (#5278)
Browse files Browse the repository at this point in the history
* fix monitor info

* Merge remote-tracking branch 'origin/fix-monitor' into fix-monitor

* Merge remote-tracking branch 'origin/fix-monitor' into fix-monitor
  • Loading branch information
wallyxjh authored Dec 16, 2024
1 parent 2864b0a commit bfbff3f
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 64 deletions.
11 changes: 6 additions & 5 deletions service/exceptionmonitor/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ type QueryResult struct {
}

type Info struct {
// lastStatus、recoveryStatus、lastStatusTime、recoveryStatusTime、lastStatusInfo、recoveryStatusInfo
//todo 是否应该分几个状态,是否有状态不正确的地方
DatabaseClusterName string
Namespace string
DebtLevel string
Expand Down Expand Up @@ -74,16 +72,19 @@ const (
//StatusUpdating = "Updating"
StatusUnknown = ""
MonitorTypeALL = "all"
DiskChinese = "磁盘"
MemoryChinese = "内存"
CPUChinese = "CPU"
)

var (
ClientSet *kubernetes.Clientset
DynamicClient *dynamic.DynamicClient
DebtNamespaceMap = make(map[string]bool)
DiskFullNamespaceMap = make(map[string]bool)
DiskMonitorNamespaceMap = make(map[string]bool)
CPUMonitorNamespaceMap = make(map[string]bool)
MemMonitorNamespaceMap = make(map[string]bool)
CPUNotificationInfoMap = make(map[string]*Info)
MemNotificationInfoMap = make(map[string]*Info)
DiskNotificationInfoMap = make(map[string]*Info)
LastBackupStatusMap = make(map[string]string)
IsSendBackupStatusMap = make(map[string]string)
DatabaseNotificationInfoMap = make(map[string]*Info)
Expand Down
7 changes: 3 additions & 4 deletions service/exceptionmonitor/helper/monitor/database_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func checkDeletedDatabases() {
// DatabaseClusterUID: databaseClusterUID,
// Namespace: notificationInfo.Namespace,
// DatabaseClusterName: databaseClusterName,
// RecoveryStatus: "Deleted",
// RecoveryStatus: "Deleted",ws
//}
notificationInfo.RecoveryStatus = "Deleted"
notificationInfo.RecoveryTime = time.Now().Format("2006-01-02 15:04:05")
Expand Down Expand Up @@ -109,16 +109,15 @@ func processCluster(cluster metav1unstructured.Unstructured) {
getClusterDatabaseInfo(cluster, &notificationInfo)
switch notificationInfo.ExceptionStatus {
case api.StatusRunning, api.StatusStopped:
if _, ok := api.DatabaseNotificationInfoMap[notificationInfo.DatabaseClusterUID]; ok {
recoveryNotificationInfo := api.DatabaseNotificationInfoMap[notificationInfo.DatabaseClusterUID]
if value, ok := api.DatabaseNotificationInfoMap[notificationInfo.DatabaseClusterUID]; ok {
recoveryNotificationInfo := value
recoveryNotificationInfo.RecoveryStatus, recoveryNotificationInfo.RecoveryTime = getClusterDatabaseStatus(cluster, recoveryNotificationInfo)
handleClusterRecovery(recoveryNotificationInfo)
}
case api.StatusDeleting, api.StatusStopping:
// nothing to do
break
case api.StatusUnknown:
//一般都是在新建,应该发到新建的飞书群中
if _, ok := api.DatabaseNotificationInfoMap[notificationInfo.DatabaseClusterUID]; !ok {
api.DatabaseNotificationInfoMap[notificationInfo.DatabaseClusterUID] = &notificationInfo
//api.LastDatabaseClusterStatus[notificationInfo.DatabaseClusterUID] = notificationInfo.ExceptionStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,18 @@ func checkDatabasePerformanceInNamespace(namespace string) error {
func monitorCluster(cluster unstructured.Unstructured) {
notificationInfo := api.Info{}
getClusterDatabaseInfo(cluster, &notificationInfo)
//notificationInfo.DatabaseClusterName, notificationInfo.DatabaseType, notificationInfo.Namespace, notificationInfo.DatabaseClusterUID = cluster.GetName(), cluster.GetLabels()[api.DatabaseTypeLabel], cluster.GetNamespace(), string(cluster.GetUID())
//status, found, err := unstructured.NestedString(cluster.Object, "status", "phase")
//if err != nil || !found {
// log.Printf("Unable to get %s status in ns %s: %v", notificationInfo.DatabaseClusterName, notificationInfo.Namespace, err)
//}
debt, _, _ := checkDebt(notificationInfo.Namespace)
if !debt {
return
}
notificationInfo.NotificationType = notification.ExceptionType
notificationInfo.ExceptionType = "阀值"
if value, ok := api.CPUNotificationInfoMap[notificationInfo.DatabaseClusterUID]; ok {
notificationInfo = *value
} else if value, ok := api.MemNotificationInfoMap[notificationInfo.DatabaseClusterUID]; ok {
notificationInfo = *value
} else if value, ok := api.DiskNotificationInfoMap[notificationInfo.DatabaseClusterUID]; ok {
notificationInfo = *value
}
switch notificationInfo.ExceptionStatus {
case api.StatusDeleting, api.StatusCreating, api.StatusStopping, api.StatusStopped, api.StatusUnknown:
break
Expand All @@ -84,58 +85,99 @@ func monitorCluster(cluster unstructured.Unstructured) {

func handleCPUMemMonitor(notificationInfo *api.Info) {
if cpuUsage, err := CPUMemMonitor(notificationInfo, "cpu"); err == nil {
processUsage(cpuUsage, api.DatabaseCPUMonitorThreshold, "CPU", notificationInfo, api.CPUMonitorNamespaceMap)
processUsage(cpuUsage, api.DatabaseCPUMonitorThreshold, api.CPUChinese, notificationInfo)
} else {
log.Printf("Failed to monitor CPU: %v", err)
}
if memUsage, err := CPUMemMonitor(notificationInfo, "memory"); err == nil {
processUsage(memUsage, api.DatabaseMemMonitorThreshold, "内存", notificationInfo, api.MemMonitorNamespaceMap)
processUsage(memUsage, api.DatabaseMemMonitorThreshold, api.MemoryChinese, notificationInfo)
} else {
log.Printf("Failed to monitor Memory: %v", err)
}
}

func handleDiskMonitor(notificationInfo *api.Info) {
if maxUsage, err := checkPerformance(notificationInfo, "disk"); err == nil {
processUsage(maxUsage, api.DatabaseDiskMonitorThreshold, "磁盘", notificationInfo, api.DiskMonitorNamespaceMap)
processUsage(maxUsage, api.DatabaseDiskMonitorThreshold, api.DiskChinese, notificationInfo)
} else {
log.Printf("Failed to monitor Disk: %v", err)
}
}

func processUsage(usage float64, threshold float64, performanceType string, notificationInfo *api.Info, monitorMap map[string]bool) {
func processUsage(usage float64, threshold float64, performanceType string, notificationInfo *api.Info) {
notificationInfo.PerformanceType = performanceType
usageStr := strconv.FormatFloat(usage, 'f', 2, 64)
if performanceType == "CPU" {
if notificationInfo.PerformanceType == api.CPUChinese {
notificationInfo.CPUUsage = usageStr
} else if performanceType == "内存" {
} else if performanceType == api.MemoryChinese {
notificationInfo.MemUsage = usageStr
} else if performanceType == "磁盘" {
} else if performanceType == api.DiskChinese {
notificationInfo.DiskUsage = usageStr
}
if usage >= threshold && !monitorMap[notificationInfo.DatabaseClusterUID] {
alertMessage := notification.GetNotificationMessage(notificationInfo)
notificationInfo.FeishuWebHook = api.FeishuWebhookURLMap["FeishuWebhookURLImportant"]
if err := notification.SendFeishuNotification(notificationInfo, alertMessage); err != nil {
log.Printf("Failed to send notification: %v", err)
if usage >= threshold {
if _, ok := api.CPUNotificationInfoMap[notificationInfo.DatabaseClusterUID]; !ok && notificationInfo.PerformanceType == api.CPUChinese {
processException(notificationInfo, threshold)
}
if _, ok := api.MemNotificationInfoMap[notificationInfo.DatabaseClusterUID]; !ok && notificationInfo.PerformanceType == api.MemoryChinese {
processException(notificationInfo, threshold)
}
monitorMap[notificationInfo.DatabaseClusterUID] = true
if performanceType != "磁盘" {
return
if _, ok := api.DiskNotificationInfoMap[notificationInfo.DatabaseClusterUID]; !ok && notificationInfo.PerformanceType == api.DiskChinese {
processException(notificationInfo, threshold)
}
ZNThreshold := NumberToChinese(int(threshold))
if err := notification.SendToSms(notificationInfo, api.ClusterName, "数据库"+performanceType+"超过百分之"+ZNThreshold); err != nil {
log.Printf("Failed to send Sms: %v", err)
} else if usage < threshold {
if _, ok := api.CPUNotificationInfoMap[notificationInfo.DatabaseClusterUID]; ok && notificationInfo.PerformanceType == api.CPUChinese {
processRecovery(notificationInfo)
}
} else if usage < threshold && monitorMap[notificationInfo.DatabaseClusterUID] {
notificationInfo.NotificationType = "recovery"
notificationInfo.RecoveryTime = time.Now().Add(8 * time.Hour).Format("2006-01-02 15:04:05")
alertMessage := notification.GetNotificationMessage(notificationInfo)
notificationInfo.FeishuWebHook = api.FeishuWebhookURLMap["FeishuWebhookURLImportant"]
if err := notification.SendFeishuNotification(notificationInfo, alertMessage); err != nil {
log.Printf("Failed to send notification: %v", err)
if _, ok := api.MemNotificationInfoMap[notificationInfo.DatabaseClusterUID]; ok && notificationInfo.PerformanceType == api.MemoryChinese {
processRecovery(notificationInfo)
}
delete(monitorMap, notificationInfo.DatabaseClusterUID)
if _, ok := api.DiskNotificationInfoMap[notificationInfo.DatabaseClusterUID]; ok && notificationInfo.PerformanceType == api.DiskChinese {
processRecovery(notificationInfo)
}
}
}

func processException(notificationInfo *api.Info, threshold float64) {
notificationInfo.NotificationType = notification.ExceptionType
alertMessage := notification.GetNotificationMessage(notificationInfo)
notificationInfo.FeishuWebHook = api.FeishuWebhookURLMap["FeishuWebhookURLImportant"]
if err := notification.SendFeishuNotification(notificationInfo, alertMessage); err != nil {
log.Printf("Failed to send notification: %v", err)
}
if notificationInfo.PerformanceType == api.CPUChinese {
api.CPUNotificationInfoMap[notificationInfo.DatabaseClusterUID] = notificationInfo
return
}
if notificationInfo.PerformanceType == api.MemoryChinese {
api.MemNotificationInfoMap[notificationInfo.DatabaseClusterUID] = notificationInfo
return
}
if notificationInfo.PerformanceType == api.DiskChinese {
api.DiskNotificationInfoMap[notificationInfo.DatabaseClusterUID] = notificationInfo
}
ZNThreshold := NumberToChinese(int(threshold))
if err := notification.SendToSms(notificationInfo, api.ClusterName, "数据库"+notificationInfo.PerformanceType+"超过百分之"+ZNThreshold); err != nil {
log.Printf("Failed to send Sms: %v", err)
}
}

func processRecovery(notificationInfo *api.Info) {
notificationInfo.NotificationType = "recovery"
notificationInfo.RecoveryStatus = notificationInfo.ExceptionStatus
notificationInfo.RecoveryTime = time.Now().Add(8 * time.Hour).Format("2006-01-02 15:04:05")
alertMessage := notification.GetNotificationMessage(notificationInfo)
notificationInfo.FeishuWebHook = api.FeishuWebhookURLMap["FeishuWebhookURLImportant"]
if err := notification.SendFeishuNotification(notificationInfo, alertMessage); err != nil {
log.Printf("Failed to send notification: %v", err)
}
if notificationInfo.PerformanceType == api.CPUChinese {
delete(api.CPUNotificationInfoMap, notificationInfo.DatabaseClusterUID)
}
if notificationInfo.PerformanceType == api.MemoryChinese {
delete(api.MemNotificationInfoMap, notificationInfo.DatabaseClusterUID)
}
if notificationInfo.PerformanceType == api.DiskChinese {
delete(api.DiskNotificationInfoMap, notificationInfo.DatabaseClusterUID)
}
}

Expand Down
19 changes: 14 additions & 5 deletions service/exceptionmonitor/helper/monitor/quota_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,27 @@ import (

func QuotaMonitor() {
for api.QuotaMonitor {
if err := checkQuota(); err != nil {
if err := checkQuota(api.ClusterNS); err != nil {
log.Printf("Failed to check qouta: %v", err)
}
time.Sleep(3 * time.Hour)
}
}

func checkQuota() error {
namespaceList, _ := api.ClientSet.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
func checkQuota(namespaces []string) error {
var namespaceList []v1.Namespace

fmt.Println(len(namespaceList.Items))
for _, ns := range namespaceList.Items {
// Fetch namespaces based on MonitorType
if api.MonitorType == api.MonitorTypeALL {
namespaces, _ := api.ClientSet.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
namespaceList = namespaces.Items
} else {
for _, ns := range namespaces {
namespace, _ := api.ClientSet.CoreV1().Namespaces().Get(context.Background(), ns, metav1.GetOptions{})
namespaceList = append(namespaceList, *namespace)
}
}
for _, ns := range namespaceList {
if !strings.Contains(ns.Name, "ns-") {
continue
}
Expand Down
27 changes: 9 additions & 18 deletions service/exceptionmonitor/helper/notification/feishu.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,14 @@ func GetNotificationMessage(notificationInfo *api.Info) string {
headerTemplate := "red"
titleContent := "数据库" + notificationInfo.ExceptionType + "告警"
usage := ""
if notificationInfo.PerformanceType == "CPU" {
if notificationInfo.PerformanceType == api.CPUChinese {
usage = notificationInfo.CPUUsage
} else if notificationInfo.PerformanceType == "内存" {
} else if notificationInfo.PerformanceType == api.MemoryChinese {
usage = notificationInfo.MemUsage
} else if notificationInfo.PerformanceType == "磁盘" {
} else if notificationInfo.PerformanceType == api.DiskChinese {
usage = notificationInfo.DiskUsage
}

//公共部分,状态和阀值的异常、恢复过程都需要,需要判断是否首次发送信息,是的话,就用这里,不是的话,就跳过(在之前的内容上追加)
commonElements := []map[string]interface{}{
{
"tag": "div",
Expand Down Expand Up @@ -117,7 +116,6 @@ func GetNotificationMessage(notificationInfo *api.Info) string {

if notificationInfo.NotificationType == ExceptionType && notificationInfo.ExceptionType == "状态" {
exceptionElements := []map[string]interface{}{
//这个异常时间需要给值
{
"tag": "div",
"text": map[string]string{
Expand Down Expand Up @@ -147,7 +145,7 @@ func GetNotificationMessage(notificationInfo *api.Info) string {
},
}
notificationInfo.FeishuInfo = append(commonElements, exceptionElements...)
} else if notificationInfo.ExceptionType == "阀值" {
} else if notificationInfo.NotificationType == ExceptionType && notificationInfo.ExceptionType == "阀值" {
exceptionElements := []map[string]interface{}{
{
"tag": "div",
Expand All @@ -161,24 +159,21 @@ func GetNotificationMessage(notificationInfo *api.Info) string {
}

if notificationInfo.NotificationType == "recovery" {
// todo 拿到之前的发送信息并加上,已做状态监控,未做阀值监控
headerTemplate = "blue"
titleContent = "数据库" + notificationInfo.ExceptionType + "恢复通知"

//获取之前发送的飞书内容
separatorElements := []map[string]interface{}{
{
"tag": "div",
"text": map[string]string{
"content": "-------------------------------------------",
"content": "-------------------------------------数据库恢复信息-------------------------------------",
"tag": "lark_md",
},
},
}
notificationInfo.FeishuInfo = append(notificationInfo.FeishuInfo, separatorElements...)
//elements = commonElements

if notificationInfo.ExceptionType == "阀值" {
//todo 数据库阀值的恢复时间怎么跟其它统一起来,需要在数据库阀值恢复中增加恢复时间
usageRecoveryElements := []map[string]interface{}{
{
"tag": "div",
Expand Down Expand Up @@ -212,7 +207,6 @@ func GetNotificationMessage(notificationInfo *api.Info) string {
"config": map[string]bool{
"wide_screen_mode": true,
},
//elements替换成notificationInfo.FeishuInfo
"elements": notificationInfo.FeishuInfo,
"header": map[string]interface{}{
"template": headerTemplate,
Expand Down Expand Up @@ -253,11 +247,11 @@ func SendFeishuNotification(notification *api.Info, message string) error {

func getMessageIDMap(performanceType string) map[string]string {
switch performanceType {
case "磁盘":
case api.DiskChinese:
return api.DatabaseDiskMessageIDMap
case "内存":
case api.MemoryChinese:
return api.DatabaseMemMessageIDMap
case "CPU":
case api.CPUChinese:
return api.DatabaseCPUMessageIDMap
case "Backup":
return api.DatabaseBackupMessageIDMap
Expand All @@ -273,8 +267,6 @@ func updateFeishuNotification(messageID, message string) error {
MessageId(messageID).
Body(larkim.NewPatchMessageReqBodyBuilder().
Content(message).Build()).Build()

fmt.Println(messageID)
resp, err := feiShuClient.Im.Message.Patch(context.Background(), req)
if err != nil {
log.Println("Error:", err)
Expand Down Expand Up @@ -318,7 +310,6 @@ func createFeishuNotification(notification *api.Info, message string, messageIDM
} else {
messageIDMap[notification.DatabaseClusterName] = messageID
}
fmt.Println(messageIDMap)
return nil
}

Expand Down

0 comments on commit bfbff3f

Please sign in to comment.