Skip to content

Commit

Permalink
Merge pull request #2501 from actiontech/origin_manage_sql_queues
Browse files Browse the repository at this point in the history
Origin manage sql queues
  • Loading branch information
LordofAvernus authored Jul 31, 2024
2 parents b253ce2 + 8a61bbd commit 684da9f
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 27 deletions.
2 changes: 1 addition & 1 deletion sqle/api/controller/v1/instance_audit_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type AuditPlan struct {

type CreatInstanceAuditPlanResV1 struct {
controller.BaseRes
Data CreatInstanceAuditPlanRes
Data CreatInstanceAuditPlanRes `json:"data"`
}

type CreatInstanceAuditPlanRes struct {
Expand Down
1 change: 1 addition & 0 deletions sqle/dms/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func convertInstance(instance *dmsV1.ListDBService) (*model.Instance, error) {
Desc: instance.Desc,
AdditionalParams: additionalParams,
SqlQueryConfig: sqlQueryConfig,
Business: instance.Business,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion sqle/driver/mysql/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func Ping(entry *logrus.Entry, instance *driverV2.DSN) error {
}

func (c *Executor) UseSchema(schemaName string) error {
_, err := c.Db.Exec(fmt.Sprintf("use %s", schemaName))
_, err := c.Db.Exec(fmt.Sprintf("use `%s`", schemaName))
if err != nil {
return errors.New(errors.ConnectRemoteDatabaseError, err)
}
Expand Down
2 changes: 1 addition & 1 deletion sqle/driver/mysql/session/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ func (c *Context) GetColumnCardinality(tn *ast.TableName, columnName string) (in
}

func (c *Context) UseSchema(schemaName string) error {
_, err := c.e.Db.Exec(fmt.Sprintf("use %s", schemaName))
_, err := c.e.Db.Exec(fmt.Sprintf("use `%s`", schemaName))
if err != nil {
return errors.Wrap(err, "exec use schema")
}
Expand Down
74 changes: 50 additions & 24 deletions sqle/model/instance_audit_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type AuditPlanDetail struct {

func (s *Storage) ListActiveAuditPlanDetail() ([]*AuditPlanDetail, error) {
var aps []*AuditPlanDetail
err := s.db.Table("audit_plans_v2").Joins("JOIN instance_audit_plans ON instance_audit_plans.id = audit_plans_v2.instance_audit_plan_id").
err := s.db.Model(AuditPlanV2{}).Joins("JOIN instance_audit_plans ON instance_audit_plans.id = audit_plans_v2.instance_audit_plan_id").
Where("audit_plans_v2.active_status = ? AND instance_audit_plans.active_status = ?", ActiveStatusNormal, ActiveStatusNormal).
Select("audit_plans_v2.*,instance_audit_plans.project_id,instance_audit_plans.db_type,instance_audit_plans.token,instance_audit_plans.instance_name,instance_audit_plans.create_user_id").
Scan(&aps).Error
Expand All @@ -64,7 +64,7 @@ func (s *Storage) ListActiveAuditPlanDetail() ([]*AuditPlanDetail, error) {
func (s *Storage) GetAuditPlanDetailByIDType(id int, typ string) (*AuditPlanDetail, error) {
var aps *AuditPlanDetail

err := s.db.Table("audit_plans_v2").Joins("JOIN instance_audit_plans ON instance_audit_plans.id = audit_plans_v2.instance_audit_plan_id").
err := s.db.Model(AuditPlanV2{}).Joins("JOIN instance_audit_plans ON instance_audit_plans.id = audit_plans_v2.instance_audit_plan_id").
Where("audit_plans_v2.instance_audit_plan_id = ?", id).
Where("audit_plans_v2.type = ?", typ).
Select("audit_plans_v2.*,instance_audit_plans.project_id,instance_audit_plans.db_type,instance_audit_plans.token,instance_audit_plans.instance_name,instance_audit_plans.create_user_id").
Expand All @@ -90,7 +90,7 @@ func (s *Storage) GetAuditPlanDetailByID(id uint) (*AuditPlanDetail, error) {
func (s *Storage) getAuditPlanDetailByID(id uint) (*AuditPlanDetail, bool, error) {
var ap *AuditPlanDetail

err := s.db.Table("audit_plans_v2").Joins("JOIN instance_audit_plans ON instance_audit_plans.id = audit_plans_v2.instance_audit_plan_id").
err := s.db.Model(AuditPlanV2{}).Joins("JOIN instance_audit_plans ON instance_audit_plans.id = audit_plans_v2.instance_audit_plan_id").
Where("audit_plans_v2.id = ?", id).
Select("audit_plans_v2.*,instance_audit_plans.project_id,instance_audit_plans.db_type,instance_audit_plans.token,instance_audit_plans.instance_name,instance_audit_plans.create_user_id").
Where("audit_plans_v2.active_status = ? AND instance_audit_plans.active_status = ?", ActiveStatusNormal, ActiveStatusNormal).
Expand Down Expand Up @@ -235,7 +235,7 @@ type SQLManager struct {

func (s *Storage) GetAuditPlanByInstanceIdAndType(instanceAuditPlanID string, auditPlanType string) (*AuditPlanV2, bool, error) {
auditPlan := &AuditPlanV2{}
err := s.db.Table("audit_plans_v2").
err := s.db.Model(AuditPlanV2{}).
Where("audit_plans_v2.instance_audit_plan_id = ?", instanceAuditPlanID).
Where("audit_plans_v2.type = ?", auditPlanType).
First(auditPlan).Error
Expand Down Expand Up @@ -272,33 +272,59 @@ func (s *Storage) BatchSaveAuditPlans(auditPlans []*AuditPlanV2) error {
}

func (s *Storage) DeleteInstanceAuditPlan(instanceAuditPlanId string) error {
err := s.db.Exec(`UPDATE instance_audit_plans iap
LEFT JOIN audit_plans_v2 ap ON iap.id = ap.instance_audit_plan_id
LEFT JOIN origin_manage_sqls oms ON oms.source_id = ap.id
LEFT JOIN sql_managers sm ON sm.origin_manage_sql_id = oms.id
SET iap.deleted_at = now(),
ap.deleted_at = now(),
oms.deleted_at = now(),
sm.deleted_at = now()
WHERE iap.ID = ?`, instanceAuditPlanId).Error
return err
return s.Tx(func(txDB *gorm.DB) error {
// 删除队列表中数据
err := txDB.Exec(`DELETE FROM origin_manage_sql_queues USING origin_manage_sql_queues
JOIN audit_plans_v2 ap ON ap.id=origin_manage_sql_queues.source_id
JOIN instance_audit_plans iap ON iap.id = ap.instance_audit_plan_id
WHERE iap.ID = ?`, instanceAuditPlanId).Error
if err != nil {
return err
}
err = txDB.Exec(`UPDATE instance_audit_plans iap
LEFT JOIN audit_plans_v2 ap ON iap.id = ap.instance_audit_plan_id
LEFT JOIN origin_manage_sqls oms ON oms.source_id = ap.id
LEFT JOIN sql_managers sm ON sm.origin_manage_sql_id = oms.id
SET iap.deleted_at = now(),
ap.deleted_at = now(),
oms.deleted_at = now(),
sm.deleted_at = now()
WHERE iap.ID = ?`, instanceAuditPlanId).Error
if err != nil {
return err
}
return nil
})
}

func (s *Storage) DeleteAuditPlan(instanceAuditPlanId, auditPlanType string) error {
err := s.db.Exec(`UPDATE instance_audit_plans iap
LEFT JOIN audit_plans_v2 ap ON iap.id = ap.instance_audit_plan_id
LEFT JOIN origin_manage_sqls oms ON oms.source_id = ap.id
LEFT JOIN sql_managers sm ON sm.origin_manage_sql_id = oms.id
SET ap.deleted_at = now(),
oms.deleted_at = now(),
sm.deleted_at = now()
WHERE iap.ID = ? AND ap.type = ?`, instanceAuditPlanId, auditPlanType).Error
return err
return s.Tx(func(txDB *gorm.DB) error {
// 删除队列表中数据
err := txDB.Exec(`DELETE FROM origin_manage_sql_queues USING origin_manage_sql_queues
JOIN audit_plans_v2 ap ON ap.id=origin_manage_sql_queues.source_id
JOIN instance_audit_plans iap ON iap.id = ap.instance_audit_plan_id
WHERE iap.ID = ? AND ap.type = ?`, instanceAuditPlanId, auditPlanType).Error
if err != nil {
return err
}
err = txDB.Exec(`UPDATE instance_audit_plans iap
LEFT JOIN audit_plans_v2 ap ON iap.id = ap.instance_audit_plan_id
LEFT JOIN origin_manage_sqls oms ON oms.source_id = ap.id
LEFT JOIN sql_managers sm ON sm.origin_manage_sql_id = oms.id
SET ap.deleted_at = now(),
oms.deleted_at = now(),
sm.deleted_at = now()
WHERE iap.ID = ? AND ap.type = ?`, instanceAuditPlanId, auditPlanType).Error
if err != nil {
return err
}
return nil
})
}

func (s *Storage) GetAuditPlanDetailByType(InstanceAuditPlanId, auditPlanType string) (*AuditPlanDetail, bool, error) {
var auditPlanDetail *AuditPlanDetail
err := s.db.Table("audit_plans_v2").Joins("JOIN instance_audit_plans ON instance_audit_plans.id = audit_plans_v2.instance_audit_plan_id").
err := s.db.Model(AuditPlanV2{}).Joins("JOIN instance_audit_plans ON instance_audit_plans.id = audit_plans_v2.instance_audit_plan_id").
Where("instance_audit_plans.id = ? AND audit_plans_v2.type = ?", InstanceAuditPlanId, auditPlanType).
Scan(&auditPlanDetail).Error
if err == gorm.ErrRecordNotFound {
Expand Down

0 comments on commit 684da9f

Please sign in to comment.