Skip to content

Commit

Permalink
Merge pull request #2589 from actiontech/audit_plan_sql_audit
Browse files Browse the repository at this point in the history
Audit plan sql audit optimize
  • Loading branch information
LordofAvernus authored Sep 6, 2024
2 parents e7afe89 + 7133c9f commit 8f0f709
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 40 deletions.
1 change: 1 addition & 0 deletions sqle/api/controller/v2/sql_manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type SqlManage struct {
InstanceName string `json:"instance_name"`
SchemaName string `json:"schema_name"`
AuditResult []*v1.AuditResult `json:"audit_result"`
AuditStatus string `json:"audit_status" enums:"being_audited"`
FirstAppearTimeStamp string `json:"first_appear_timestamp"`
LastReceiveTimeStamp string `json:"last_receive_timestamp"`
FpCount uint64 `json:"fp_count"`
Expand Down
6 changes: 6 additions & 0 deletions sqle/docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17015,6 +17015,12 @@ var doc = `{
"$ref": "#/definitions/v1.AuditResult"
}
},
"audit_status": {
"type": "string",
"enum": [
"being_audited"
]
},
"endpoints": {
"type": "string"
},
Expand Down
6 changes: 6 additions & 0 deletions sqle/docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -16999,6 +16999,12 @@
"$ref": "#/definitions/v1.AuditResult"
}
},
"audit_status": {
"type": "string",
"enum": [
"being_audited"
]
},
"endpoints": {
"type": "string"
},
Expand Down
4 changes: 4 additions & 0 deletions sqle/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4947,6 +4947,10 @@ definitions:
items:
$ref: '#/definitions/v1.AuditResult'
type: array
audit_status:
enum:
- being_audited
type: string
endpoints:
type: string
first_appear_timestamp:
Expand Down
27 changes: 18 additions & 9 deletions sqle/model/instance_audit_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,22 +481,31 @@ func (s *Storage) PullSQLFromManagerSQLQueue() ([]*SQLManageQueue, error) {
return sqls, err
}

func (s *Storage) RemoveSQLFromQueue(sql *SQLManageQueue) error {
return s.db.Unscoped().Delete(sql).Error
func (s *Storage) RemoveSQLFromQueue(txDB *gorm.DB, sql *SQLManageQueue) error {
return txDB.Unscoped().Delete(sql).Error
}

func (s *Storage) UpdateManagerSQL(sql *SQLManageRecord) error {
const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`, `audit_level`, `audit_results`,`priority`) " +
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), `priority` = VALUES(priority), " +
"`schema_name` = VALUES(`schema_name`), `sql_text` = VALUES(sql_text), `sql_fingerprint` = VALUES(sql_fingerprint), `info`= VALUES(info), `audit_level`= VALUES(audit_level), `audit_results`= VALUES(audit_results)"
return s.db.Exec(query, sql.SQLID, sql.Source, sql.SourceId, sql.ProjectId, sql.InstanceID, sql.SchemaName, sql.SqlFingerprint, sql.SqlText, sql.Info, sql.AuditLevel, sql.AuditResults, sql.Priority).Error
func (s *Storage) SaveManagerSQL(txDB *gorm.DB, sql *SQLManageRecord) error {
const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`) " +
"VALUES (?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), " +
"`schema_name` = VALUES(`schema_name`), `sql_text` = VALUES(sql_text), `sql_fingerprint` = VALUES(sql_fingerprint), `info`= VALUES(info)"
return txDB.Exec(query, sql.SQLID, sql.Source, sql.SourceId, sql.ProjectId, sql.InstanceID, sql.SchemaName, sql.SqlFingerprint, sql.SqlText, sql.Info).Error
}

func (s *Storage) UpdateManagerSQLStatus(sql *SQLManageRecord) error {
func (s *Storage) UpdateManagerSQLStatus(txDB *gorm.DB, sql *SQLManageRecord) error {
const query = ` INSERT INTO sql_manage_record_processes (sql_manage_record_id)
SELECT smr.id FROM sql_manage_records smr WHERE smr.sql_id = ?
ON DUPLICATE KEY UPDATE sql_manage_record_id = VALUES(sql_manage_record_id);`
return s.db.Exec(query, sql.SQLID).Error
return txDB.Exec(query, sql.SQLID).Error
}

func (s *Storage) UpdateManagerSQLBySqlId(sqlManageMap map[string]interface{}, sqlId string) error {
err := s.db.Model(&SQLManageRecord{}).Where("sql_id = ?", sqlId).
Updates(sqlManageMap).Error
if err != nil {
return err
}
return nil
}

func (s *Storage) UpdateAuditPlanLastCollectionTime(auditPlanID uint, collectionTime time.Time) error {
Expand Down
2 changes: 1 addition & 1 deletion sqle/model/instance_audit_plan_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ audit_plan_sqls.sql_fingerprint,
audit_plan_sqls.sql_text,
audit_plan_sqls.schema_name,
audit_plan_sqls.info,
audit_plan_sqls.audit_results,
IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results,
audit_plan_sqls.priority
{{- template "body" . -}}
Expand Down
12 changes: 6 additions & 6 deletions sqle/model/instance_audit_plan_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ audit_plan_sqls.sql_fingerprint,
audit_plan_sqls.sql_text,
audit_plan_sqls.schema_name,
audit_plan_sqls.info,
audit_plan_sqls.audit_results,
IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results,
audit_plan_sqls.priority
FROM sql_manage_records AS audit_plan_sqls
Expand All @@ -59,7 +59,7 @@ audit_plan_sqls.sql_fingerprint,
audit_plan_sqls.sql_text,
audit_plan_sqls.schema_name,
audit_plan_sqls.info,
audit_plan_sqls.audit_results,
IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results,
audit_plan_sqls.priority
FROM sql_manage_records AS audit_plan_sqls
Expand Down Expand Up @@ -87,7 +87,7 @@ audit_plan_sqls.sql_fingerprint,
audit_plan_sqls.sql_text,
audit_plan_sqls.schema_name,
audit_plan_sqls.info,
audit_plan_sqls.audit_results,
IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results,
audit_plan_sqls.priority
FROM sql_manage_records AS audit_plan_sqls
Expand Down Expand Up @@ -115,7 +115,7 @@ audit_plan_sqls.sql_fingerprint,
audit_plan_sqls.sql_text,
audit_plan_sqls.schema_name,
audit_plan_sqls.info,
audit_plan_sqls.audit_results,
IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results,
audit_plan_sqls.priority
FROM sql_manage_records AS audit_plan_sqls
Expand Down Expand Up @@ -144,7 +144,7 @@ audit_plan_sqls.sql_fingerprint,
audit_plan_sqls.sql_text,
audit_plan_sqls.schema_name,
audit_plan_sqls.info,
audit_plan_sqls.audit_results,
IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results,
audit_plan_sqls.priority
FROM sql_manage_records AS audit_plan_sqls
Expand Down Expand Up @@ -172,7 +172,7 @@ audit_plan_sqls.sql_fingerprint,
audit_plan_sqls.sql_text,
audit_plan_sqls.schema_name,
audit_plan_sqls.info,
audit_plan_sqls.audit_results,
IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results,
audit_plan_sqls.priority
FROM sql_manage_records AS audit_plan_sqls
Expand Down
74 changes: 50 additions & 24 deletions sqle/server/auditplan/job_task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/actiontech/sqle/sqle/model"
"github.com/actiontech/sqle/sqle/server"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
)

type AuditPlanHandlerJob struct {
Expand Down Expand Up @@ -61,38 +62,63 @@ func (j *AuditPlanHandlerJob) HandlerSQL(entry *logrus.Entry) {
if len(sqlList) == 0 {
return
}
// 审核
sqlList, err = BatchAuditSQLs(sqlList, true)
if err != nil {
entry.Warnf("batch audit origin manager sql failed, error: %v", err)

// todo: 错误处理
if err = s.Tx(func(txDB *gorm.DB) error {
for _, sql := range sqlList {
err := s.SaveManagerSQL(txDB, sql)
if err != nil {
entry.Warnf("update manager sql failed, error: %v", err)
return err
}

// 更新状态表
err = s.UpdateManagerSQLStatus(txDB, sql)
if err != nil {
entry.Warnf("update manager sql status failed, error: %v", err)
return err
}
}

for _, sql := range queues {
err := s.RemoveSQLFromQueue(txDB, sql)
if err != nil {
entry.Warnf("remove manager sql queue failed, error: %v", err)
return err
}
}

return nil

}); err != nil {
return
}

go handlerSQLAudit(entry, sqlList)

}

// todo: 错误处理
func handlerSQLAudit(entry *logrus.Entry, sqlList []*model.SQLManageRecord) {
s := model.GetStorage()
sqlList, err := BatchAuditSQLs(sqlList, true)
if err != nil {
entry.Warnf("batch audit manager sql failed, error: %v", err)
}
// 设置高优先级
sqlList, err = SetSQLPriority(sqlList)
if err != nil {
entry.Warnf("check sql priority sql failed, error: %v", err)
return
entry.Warnf("set sql priority sql failed, error: %v", err)
}
// todo: 保证事务和错误处理
for _, sql := range sqlList {
err := s.UpdateManagerSQL(sql)
manageSqlParam := make(map[string]interface{}, 3)
manageSqlParam["audit_level"] = sql.AuditLevel
manageSqlParam["audit_results"] = sql.AuditResults
manageSqlParam["priority"] = sql.Priority
err = s.UpdateManagerSQLBySqlId(manageSqlParam, sql.SQLID)
if err != nil {
entry.Warnf("update manager sql failed, error: %v", err)
return
}

// 同时更新状态表
err = s.UpdateManagerSQLStatus(sql)
if err != nil {
entry.Warnf("update manager sql status failed, error: %v", err)
return
}

}
for _, sql := range queues {
err := s.RemoveSQLFromQueue(sql)
if err != nil {
entry.Warnf("remove manager sql queue failed, error: %v", err)
return
continue
}
}
}
Expand Down

0 comments on commit 8f0f709

Please sign in to comment.