Skip to content

Commit

Permalink
fix: fix the audit plan being deleted, causing sql audit panic
Browse files Browse the repository at this point in the history
  • Loading branch information
iwanghc committed Nov 15, 2024
1 parent 7d29fef commit fb94c18
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 23 deletions.
67 changes: 45 additions & 22 deletions sqle/model/instance_audit_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,23 +368,23 @@ func (s *Storage) BatchSaveAuditPlans(auditPlans []*AuditPlanV2) error {

func (s *Storage) DeleteInstanceAuditPlan(instanceAuditPlanId string) error {
return s.Tx(func(txDB *gorm.DB) error {
// 删除队列表中数据
err := txDB.Exec(`DELETE FROM sql_manage_queues USING sql_manage_queues
JOIN audit_plans_v2 ap ON ap.id=sql_manage_queues.source_id
JOIN instance_audit_plans iap ON iap.id = ap.instance_audit_plan_id
WHERE iap.ID = ?`, instanceAuditPlanId).Error
// 删除管控及相关表
err := txDB.Exec(`DELETE p, q, r
FROM sql_manage_record_processes p
JOIN sql_manage_records r ON p.sql_manage_record_id = r.id
LEFT JOIN sql_manage_queues q ON q.source_id = r.source_id
JOIN audit_plans_v2 a ON a.instance_audit_plan_id = r.source_id AND a.type = r.source
WHERE a.instance_audit_plan_id = ?`, instanceAuditPlanId).Error
if err != nil {
return err
}

// 软删除实例扫描任务
err = txDB.Exec(`UPDATE instance_audit_plans iap
LEFT JOIN audit_plans_v2 ap ON iap.id = ap.instance_audit_plan_id
LEFT JOIN audit_plan_task_infos apti ON apti.audit_plan_id = ap.id
LEFT JOIN sql_manage_records smr ON smr.source_id = ap.instance_audit_plan_id AND smr.source = ap.type
LEFT JOIN sql_manage_record_processes smrp ON smrp.sql_manage_record_id = smr.id
SET iap.deleted_at = now(),
ap.deleted_at = now(),
smr.deleted_at = now(),
smrp.deleted_at = now(),
apti.deleted_at = now()
WHERE iap.ID = ?`, instanceAuditPlanId).Error
if err != nil {
Expand All @@ -396,20 +396,24 @@ func (s *Storage) DeleteInstanceAuditPlan(instanceAuditPlanId string) error {

func (s *Storage) DeleteAuditPlan(auditPlanID int) error {
return s.Tx(func(txDB *gorm.DB) error {
// 删除队列表中数据
err := txDB.Exec(`DELETE FROM sql_manage_queues USING sql_manage_queues
JOIN audit_plans_v2 ap ON ap.id=sql_manage_queues.source_id
WHERE ap.id = ?`, auditPlanID).Error
// 管控及相关表的数据需要硬删除
// 当在同一个实例扫描任务下删除并重新创建了同一类型的扫描任务,它采集到的sql指纹可能会与被删除的任务采集到sql指纹一致
// 因为source id是实例任务id,不能保证新的扫描任务生成指纹时具有唯一性
// 所以当其他扫描任务采集的sql与被删除的sql管控记录指纹一致时,需要插入一条新的sql管控记录,而不是去update被软删除的记录
err := txDB.Exec(`DELETE p, q, r
FROM sql_manage_record_processes p
JOIN sql_manage_records r ON p.sql_manage_record_id = r.id
LEFT JOIN sql_manage_queues q ON q.source_id = r.source_id
JOIN audit_plans_v2 a ON a.instance_audit_plan_id = r.source_id AND a.type = r.source
WHERE a.id = ?`, auditPlanID).Error
if err != nil {
return err
}

// 软删除扫描任务
err = txDB.Exec(`UPDATE audit_plans_v2 ap
LEFT JOIN audit_plan_task_infos apti ON apti.audit_plan_id = ap.id
LEFT JOIN sql_manage_records smr ON smr.source_id = ap.instance_audit_plan_id AND smr.source = ap.type
LEFT JOIN sql_manage_record_processes smrp ON smrp.sql_manage_record_id = smr.id
SET ap.deleted_at = now(),
smr.deleted_at = now(),
smrp.deleted_at = now(),
apti.deleted_at = now()
WHERE ap.id = ?`, auditPlanID).Error
if err != nil {
Expand All @@ -419,13 +423,15 @@ func (s *Storage) DeleteAuditPlan(auditPlanID int) error {
})
}

var AuditPlanNotFoundErr = errors.New(errors.DataNotExist, fmt.Errorf("cant find audit plan"))

Check failure on line 426 in sqle/model/instance_audit_plan.go

View workflow job for this annotation

GitHub Actions / lint

the variable name `AuditPlanNotFoundErr` should conform to the `ErrXxx` format (errname)

func (s *Storage) GetAuditPlanDetailByInstAuditPlanIdAndType(instAuditPlanId string, auditPlanType string) (*AuditPlanDetail, error) {
ap, exist, err := s.GetAuditPlanDetailByType(instAuditPlanId, auditPlanType)
if err != nil {
return nil, err
}
if !exist {
return nil, fmt.Errorf("cant find audit plan by id %s", instAuditPlanId)
return nil, AuditPlanNotFoundErr
}
return ap, errors.New(errors.ConnectStorageError, err)
}
Expand All @@ -436,10 +442,13 @@ func (s *Storage) GetAuditPlanDetailByType(InstanceAuditPlanId, auditPlanType st
Where("instance_audit_plans.id = ? AND audit_plans_v2.type = ?", InstanceAuditPlanId, auditPlanType).
Select("audit_plans_v2.*,instance_audit_plans.project_id,instance_audit_plans.db_type,instance_audit_plans.token,instance_audit_plans.instance_id,instance_audit_plans.create_user_id").
Scan(&auditPlanDetail).Error
if err == gorm.ErrRecordNotFound {
return auditPlanDetail, false, nil
if err != nil {
return auditPlanDetail, false, errors.New(errors.ConnectStorageError, err)
}
if auditPlanDetail == nil {
return nil, false, nil
}
return auditPlanDetail, true, errors.New(errors.ConnectStorageError, err)
return auditPlanDetail, true, nil
}

func (s *Storage) GetInstanceAuditPlanByInstanceID(instanceID int64) (*InstanceAuditPlan, bool, error) {
Expand Down Expand Up @@ -474,14 +483,28 @@ func (s *Storage) PushSQLToManagerSQLQueue(sqls []*SQLManageQueue) error {

func (s *Storage) PullSQLFromManagerSQLQueue() ([]*SQLManageQueue, error) {
sqls := []*SQLManageQueue{}
err := s.db.Find(&sqls).Limit(100).Error
err := s.db.Limit(100).Find(&sqls).Error
return sqls, err
}

func (s *Storage) RemoveSQLFromQueue(txDB *gorm.DB, sql *SQLManageQueue) error {
return txDB.Unscoped().Delete(sql).Error
}

func (s *Storage) DeleteSQLManageRecordBySourceId(sourceId string) error {
return s.Tx(func(txDB *gorm.DB) error {
err := txDB.Exec(`DELETE p, r
FROM sql_manage_record_processes p
JOIN sql_manage_records r ON p.sql_manage_record_id = r.id
WHERE r.source_id = ?`, sourceId).Error

if err != nil {
return err
}
return nil
})
}

func (s *Storage) SaveManagerSQL(txDB *gorm.DB, sql *SQLManageRecord) error {
const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`) " +
"VALUES (?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), " +
Expand Down
18 changes: 17 additions & 1 deletion sqle/server/auditplan/job_task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package auditplan

import (
"database/sql"
"errors"
"fmt"
"strings"
"time"

driverV2 "github.com/actiontech/sqle/sqle/driver/v2"
Expand Down Expand Up @@ -124,7 +126,7 @@ func handlerSQLAudit(entry *logrus.Entry, sqlList []*model.SQLManageRecord) {
}

func BatchAuditSQLs(sqlList []*model.SQLManageRecord, isSkipAuditedSql bool) ([]*model.SQLManageRecord, error) {

s := model.GetStorage()
// SQL聚合
sqlMap := make(map[string][]*model.SQLManageRecord)
for _, sql := range sqlList {
Expand Down Expand Up @@ -153,6 +155,20 @@ func BatchAuditSQLs(sqlList []*model.SQLManageRecord, isSkipAuditedSql bool) ([]
return nil, err
}
resp, err := meta.Handler.Audit(sqls)
// 当管控队列表中sql出栈审核时扫描任务被删除,则清空已经save到管控表的sql。
if err != nil && errors.Is(err, model.AuditPlanNotFoundErr) {
log.NewEntry().Warnf("audit sqls in task fail %v,cant find audit plan by id %s", err, sqls[0].SourceId)
err := s.DeleteSQLManageRecordBySourceId(sqls[0].SourceId)
if err != nil {
log.NewEntry().Errorf("delete sql manage record fail %v", err)
}
for k := range sqlMap {
if strings.HasPrefix(k, sqls[0].SourceId+":") {
delete(sqlMap, k)
}
}
continue
}
if err != nil {
log.NewEntry().Errorf("audit sqls in task fail %v,ignore audit result", err)
auditSQLs = append(auditSQLs, sqls...)
Expand Down

0 comments on commit fb94c18

Please sign in to comment.