From d6ac8cf7a2316a60b8b9afcbb675834106579d8e Mon Sep 17 00:00:00 2001 From: zhsy Date: Sat, 25 Nov 2023 12:39:16 +0800 Subject: [PATCH 01/15] =?UTF-8?q?=E6=8F=90=E4=BA=A4pg=E5=BA=93=E8=A1=A8?= =?UTF-8?q?=E5=85=83=E6=95=B0=E6=8D=AE=E5=8A=9F=E8=83=BD=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 1 + sqle/pkg/postgresql/db.go | 239 ++++++++++++++++++++++++++++++++++ sqle/server/auditplan/meta.go | 30 ++++- sqle/server/auditplan/task.go | 131 +++++++++++++++++++ 4 files changed, 397 insertions(+), 4 deletions(-) create mode 100644 sqle/pkg/postgresql/db.go diff --git a/go.mod b/go.mod index eebdd458d6..217bb95965 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/ungerik/go-dry v0.0.0-20210209114055-a3e162a9e62e github.com/urfave/cli/v2 v2.8.1 github.com/vektah/gqlparser/v2 v2.5.1 + github.com/lib/pq v1.10.2 golang.org/x/net v0.15.0 google.golang.org/grpc v1.50.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 diff --git a/sqle/pkg/postgresql/db.go b/sqle/pkg/postgresql/db.go new file mode 100644 index 0000000000..cc5cc6f81b --- /dev/null +++ b/sqle/pkg/postgresql/db.go @@ -0,0 +1,239 @@ +package postgresql + +import "C" +import ( + "database/sql" + "fmt" + _ "github.com/lib/pq" + "log" + "strings" +) + +type DSN struct { + Host string + Port string + User string + Password string + Database string +} + +func (d *DSN) String() string { + return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", + d.User, d.Password, d.Host, d.Port, d.Database) +} + +type DB struct { + Db *sql.DB + IsCaseSensitive bool +} + +func NewDB(dsn *DSN) (*DB, error) { + // 创建一个数据库连接池 + db, err := sql.Open("postgres", dsn.String()) + if err != nil { + return nil, err + } + + // 设置连接池的最大连接数和空闲连接数 + db.SetMaxOpenConns(100) // 设置最大连接数 + db.SetMaxIdleConns(10) // 设置空闲连接数 + + // 检查数据库连接 + err = db.Ping() + if err != nil { + return nil, err + } + + return &DB{Db: db}, nil +} + +func (o *DB) Close() error { + return o.Db.Close() +} + +func (o *DB) GetCaseSensitive() bool { + var isCaseSensitive bool + query := "SELECT setting FROM pg_settings WHERE name = 'quote_all_identifiers'" + + sqls, err := getResultSqls(o.Db, query) + if err != nil { + return false + } + if len(sqls) == 0 { + return false + } + for _, sqlContent := range sqls { + if strings.ToLower(sqlContent) == "on" { + return true + } + } + return isCaseSensitive +} + +func (o *DB) GetAllUserSchemas() ([]string, error) { + query := "SELECT nspname FROM pg_namespace WHERE nspname NOT LIKE 'pg_%' AND nspname != 'information_schema'" + + sqls, err := getResultSqls(o.Db, query) + if err != nil { + return nil, err + } + return sqls, nil +} + +func (o *DB) ShowSchemaTables(schema string) ([]string, error) { + query := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ + " where table_schema='%s' and TABLE_TYPE in ('BASE TABLE','SYSTEM VIEW')", schema) + + if o.IsCaseSensitive { + schema = strings.ToLower(schema) + query = fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ + " where lower(table_schema)='%s' and TABLE_TYPE in ('BASE TABLE','SYSTEM VIEW')", schema) + } + return getResultSqls(o.Db, query) +} + +func (o *DB) ShowSchemaViews(schema string) ([]string, error) { + query := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ + " where table_schema='%s' and TABLE_TYPE='VIEW'", schema) + + if o.IsCaseSensitive { + schema = strings.ToLower(schema) + query = fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ + "where lower(table_schema)='%s' and TABLE_TYPE='VIEW'", schema) + } + return getResultSqls(o.Db, query) +} + +func (o *DB) ShowCreateTables(database, tableName string, schemas []string) ([]string, error) { + tables := make([]string, 0) + for _, schema := range schemas { + tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableName) + if o.IsCaseSensitive { + database = strings.ToLower(database) + schema = strings.ToLower(schema) + tableName = strings.ToLower(tableName) + } + columnsCondition := fmt.Sprintf("table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'", + database, schema, tableName) + if o.IsCaseSensitive { + columnsCondition = fmt.Sprintf("lower(table_catalog) = '%s' AND lower(table_schema) = '%s' "+ + "AND lower(table_name) = '%s'", database, schema, tableName) + } + // 获取列定义,多个英文逗号分割 + columns := fmt.Sprintf("SELECT string_agg(column_name || ' ' || "+ + "CASE "+ + " WHEN data_type IN ('character', 'character varying', 'text') "+ + " THEN data_type || '(' || character_maximum_length || ')' "+ + " WHEN data_type IN ('numeric', 'decimal') "+ + " THEN data_type || '(' || numeric_precision || ',' || numeric_scale || ')' "+ + " WHEN data_type IN ('integer', 'smallint', 'bigint') THEN data_type "+ + " ELSE data_type "+ + " END "+ + " || "+ + " CASE "+ + " WHEN column_default != '' THEN ' DEFAULT ' || column_default ELSE '' END "+ + " || "+ + " CASE "+ + " WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END, ',\n ' ORDER BY ordinal_position) AS columns_sql"+ + " FROM information_schema.columns "+ + " WHERE %s GROUP BY table_name", columnsCondition) + sqls, err := getResultSqls(o.Db, columns) + if err != nil { + log.Printf("search column definition error:%s\n", err) + return nil, err + } + if len(sqls) == 0 { + continue + } + tableDDl += strings.Join(sqls, "") + constraintsCondition := fmt.Sprintf("d.datname = '%s' AND n.nspname = '%s' AND C.relname = '%s'", + database, schema, tableName) + if o.IsCaseSensitive { + constraintsCondition = fmt.Sprintf("lower(d.datname) = '%s' AND lower(n.nspname) = '%s' "+ + "AND lower(C.relname) = '%s'", database, schema, tableName) + } + // 获取所有约束 + constraints := fmt.Sprintf("SELECT 'CONSTRAINT ' || r.conname || ' ' || "+ + " pg_catalog.pg_get_constraintdef ( r.OID, TRUE ) AS constraint_definition "+ + " FROM pg_catalog.pg_constraint r "+ + " JOIN pg_catalog.pg_class C ON C.OID = r.conrelid "+ + " JOIN pg_catalog.pg_namespace n ON n.OID = C.relnamespace "+ + " JOIN pg_catalog.pg_database d ON d.datname = n.nspname "+ + " WHERE %s", constraintsCondition) + sqls, err = getResultSqls(o.Db, constraints) + if err != nil { + log.Printf("search constraint definition error:%s\n", err) + return nil, err + } + for _, sqlContext := range sqls { + tableDDl += ",\n" + sqlContext + } + tableDDl += ")" + indexesCondition := fmt.Sprintf("schemaname = '%s' and tablename = '%s' ", schema, tableName) + if o.IsCaseSensitive { + indexesCondition = fmt.Sprintf("lower(schemaname) = '%s' and lower(tablename) = '%s'", + schema, tableName) + } + // 获取索引 + indexes := fmt.Sprintf("SELECT indexdef AS index_definition FROM pg_indexes "+ + " WHERE %s", indexesCondition) + sqls, err = getResultSqls(o.Db, indexes) + if err != nil { + log.Printf("search index definition error:%s\n", err) + return nil, err + } + for _, sqlContent := range sqls { + if strings.Contains(sqlContent, "CREATE UNIQUE INDEX") { + continue + } + tableDDl += ";\n" + sqlContent + } + tables = append(tables, tableDDl) + } + return tables, nil +} + +func (o *DB) ShowCreateViews(database, tableName string) ([]string, error) { + query := fmt.Sprintf( + "SELECT 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition"+ + " AS create_view_statement "+ + " FROM information_schema.views WHERE table_catalog = '%s' AND table_name = '%s'", + database, tableName) + + if o.IsCaseSensitive { + database = strings.ToLower(database) + tableName = strings.ToLower(tableName) + query = fmt.Sprintf( + "SELECT 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition"+ + " AS create_view_statement "+ + " FROM information_schema.views WHERE lower(table_catalog) = '%s' AND lower(table_name) = '%s'", + database, tableName) + } + return getResultSqls(o.Db, query) +} + +func getResultSqls(db *sql.DB, query string) ([]string, error) { + rows, err := db.Query(query) + if err != nil { + return nil, err + } + defer func(rows *sql.Rows) { + innerErr := rows.Close() + if innerErr != nil { + log.Printf("Close rows error:%s\n", innerErr) + } + }(rows) + if rows.Err() != nil { + return nil, rows.Err() + } + sqls := make([]string, 0) + for rows.Next() { + var sqlContent string + err = rows.Scan(&sqlContent) + if err != nil { + return nil, err + } + sqls = append(sqls, sqlContent) + } + return sqls, nil +} diff --git a/sqle/server/auditplan/meta.go b/sqle/server/auditplan/meta.go index 89d12ef98d..aa0d686287 100644 --- a/sqle/server/auditplan/meta.go +++ b/sqle/server/auditplan/meta.go @@ -31,13 +31,15 @@ const ( TypeAllAppExtract = "all_app_extract" TypeBaiduRdsMySQLSlowLog = "baidu_rds_mysql_slow_log" TypeSQLFile = "sql_file" + TypePostgreSQLSchemaMeta = "Postgresql_schema_meta" ) const ( - InstanceTypeAll = "" - InstanceTypeMySQL = "MySQL" - InstanceTypeOracle = "Oracle" - InstanceTypeTiDB = "TiDB" + InstanceTypeAll = "" + InstanceTypeMySQL = "MySQL" + InstanceTypeOracle = "Oracle" + InstanceTypeTiDB = "TiDB" + InstanceTypePostgreSQL = "PostgreSQL" ) const ( @@ -324,6 +326,26 @@ var Metas = []Meta{ InstanceType: InstanceTypeAll, CreateTask: NewDefaultTask, }, + { + Type: TypePostgreSQLSchemaMeta, + Desc: "库表元数据", + InstanceType: InstanceTypePostgreSQL, + CreateTask: NewPostgreSQLSchemaMetaTask, + Params: []*params.Param{ + { + Key: paramKeyCollectIntervalMinute, + Desc: "采集周期(分钟)", + Value: "60", + Type: params.ParamTypeInt, + }, + { + Key: "collect_view", + Desc: "是否采集视图信息", + Value: "0", + Type: params.ParamTypeBool, + }, + }, + }, } var MetaMap = map[string]Meta{} diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go index 735212b960..7b053290ec 100644 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/actiontech/sqle/sqle/pkg/postgresql" "net/http" "strconv" "strings" @@ -1546,3 +1547,133 @@ func NewBaiduRdsMySQLSlowLogTask(entry *logrus.Entry, ap *model.AuditPlan) Task return b } + +// PostgreSQL库表元数据 +type PostgreSQLSchemaMetaTask struct { + *sqlCollector +} + +func NewPostgreSQLSchemaMetaTask(entry *logrus.Entry, ap *model.AuditPlan) Task { + sqlCollector := newSQLCollector(entry, ap) + task := &PostgreSQLSchemaMetaTask{ + sqlCollector, + } + sqlCollector.do = task.collectorDo + return task +} + +func (at *PostgreSQLSchemaMetaTask) collectorDo() { + if at.ap.InstanceName == "" { + at.logger.Warnf("instance is not configured") + return + } + if at.ap.InstanceDatabase == "" { + at.logger.Warnf("instance schema is not configured") + return + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + defer cancel() + instance, _, err := dms.GetInstanceInProjectByName(ctx, string(at.ap.ProjectId), at.ap.InstanceName) + if err != nil { + at.logger.Warnf("get postgreSQL instance error:%s", err) + return + } + + dsn := &postgresql.DSN{ + Host: instance.Host, + Port: instance.Port, + User: instance.User, + Password: instance.Password, + Database: at.ap.InstanceDatabase, + } + db, err := postgresql.NewDB(dsn) + if err != nil { + at.logger.Errorf("connect to instance fail, error: %s", err) + return + } + defer db.Close() + db.IsCaseSensitive = db.GetCaseSensitive() + + tables, err := db.ShowSchemaTables(at.ap.InstanceDatabase) + if err != nil { + at.logger.Errorf("get schema table fail, error: %s", err) + return + } + var views []string + if at.ap.Params.GetParam("collect_view").Bool() { + views, err = db.ShowSchemaViews(at.ap.InstanceDatabase) + if err != nil { + at.logger.Errorf("get schema view fail, error: %s", err) + return + } + } + + schemas, err := db.GetAllUserSchemas() + if err != nil { + at.logger.Errorf("get database=%s schemas error: %s", at.ap.InstanceDatabase, err) + return + } + if len(schemas) == 0 { + at.logger.Errorf("get database=%s schemas empty error: %s", at.ap.InstanceDatabase, err) + return + } + + sqls := make([]string, 0, len(tables)+len(views)) + for _, table := range tables { + tableSqls, err := db.ShowCreateTables(at.ap.InstanceDatabase, table, schemas) + if err != nil { + at.logger.Errorf("show create table fail, error: %s", err) + return + } + sqls = append(sqls, tableSqls...) + } + for _, view := range views { + viewSqls, err := db.ShowCreateViews(at.ap.InstanceDatabase, view) + if err != nil { + at.logger.Errorf("show create view fail, error: %s", err) + return + } + sqls = append(sqls, viewSqls...) + } + if len(sqls) > 0 { + err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, convertRawSQLToModelSQLs(sqls)) + if err != nil { + at.logger.Errorf("save schema meta to storage fail, error: %s", err) + } + } +} + +func (at *PostgreSQLSchemaMetaTask) Audit() (*AuditResultResp, error) { + task, err := getTaskWithInstanceByAuditPlan(at.ap, at.persist) + if err != nil { + return nil, err + } + return at.baseTask.audit(task) +} + +func (at *PostgreSQLSchemaMetaTask) GetSQLs(args map[string]interface{}) ([]Head, []map[string] /* head name */ string, uint64, error) { + auditPlanSQLs, count, err := at.persist.GetAuditPlanSQLsByReq(args) + if err != nil { + return nil, nil, count, err + } + head, rows := buildPostgreSQLSchemaMetaSQLsResult(auditPlanSQLs) + return head, rows, count, nil +} + +func buildPostgreSQLSchemaMetaSQLsResult(auditPlanSQLs []*model.AuditPlanSQLListDetail) ([]Head, []map[string] /* head name */ string) { + head := []Head{ + { + Name: "sql", + Desc: "SQL语句", + Type: "sql", + }, + } + rows := make([]map[string]string, 0, len(auditPlanSQLs)) + for _, sql := range auditPlanSQLs { + rows = append(rows, map[string]string{ + "sql": sql.SQLContent, + }) + } + return head, rows +} From d9d772008ef4d14022499101ab55f5eb50f3f888 Mon Sep 17 00:00:00 2001 From: zhsy Date: Mon, 27 Nov 2023 15:41:03 +0800 Subject: [PATCH 02/15] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=A1=A8=E5=92=8C?= =?UTF-8?q?=E8=A7=86=E5=9B=BE=E7=9A=84=E8=8E=B7=E5=8F=96=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=EF=BC=8C=E6=8C=89=E7=85=A7=E6=95=B0=E6=8D=AE=E5=BA=93,schema,?= =?UTF-8?q?=E8=A1=A8/=E8=A7=86=E5=9B=BE=20=E4=B8=89=E5=B1=82=E7=BB=93?= =?UTF-8?q?=E6=9E=84=E6=9D=A5=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/pkg/postgresql/db.go | 172 +++++++++++++++++----------------- sqle/server/auditplan/task.go | 90 ++++++++++++------ 2 files changed, 146 insertions(+), 116 deletions(-) diff --git a/sqle/pkg/postgresql/db.go b/sqle/pkg/postgresql/db.go index cc5cc6f81b..b43419414d 100644 --- a/sqle/pkg/postgresql/db.go +++ b/sqle/pkg/postgresql/db.go @@ -104,101 +104,98 @@ func (o *DB) ShowSchemaViews(schema string) ([]string, error) { return getResultSqls(o.Db, query) } -func (o *DB) ShowCreateTables(database, tableName string, schemas []string) ([]string, error) { +func (o *DB) ShowCreateTables(database, schema, tableName string) ([]string, error) { tables := make([]string, 0) - for _, schema := range schemas { - tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableName) - if o.IsCaseSensitive { - database = strings.ToLower(database) - schema = strings.ToLower(schema) - tableName = strings.ToLower(tableName) - } - columnsCondition := fmt.Sprintf("table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'", - database, schema, tableName) - if o.IsCaseSensitive { - columnsCondition = fmt.Sprintf("lower(table_catalog) = '%s' AND lower(table_schema) = '%s' "+ - "AND lower(table_name) = '%s'", database, schema, tableName) - } - // 获取列定义,多个英文逗号分割 - columns := fmt.Sprintf("SELECT string_agg(column_name || ' ' || "+ - "CASE "+ - " WHEN data_type IN ('character', 'character varying', 'text') "+ - " THEN data_type || '(' || character_maximum_length || ')' "+ - " WHEN data_type IN ('numeric', 'decimal') "+ - " THEN data_type || '(' || numeric_precision || ',' || numeric_scale || ')' "+ - " WHEN data_type IN ('integer', 'smallint', 'bigint') THEN data_type "+ - " ELSE data_type "+ - " END "+ - " || "+ - " CASE "+ - " WHEN column_default != '' THEN ' DEFAULT ' || column_default ELSE '' END "+ - " || "+ - " CASE "+ - " WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END, ',\n ' ORDER BY ordinal_position) AS columns_sql"+ - " FROM information_schema.columns "+ - " WHERE %s GROUP BY table_name", columnsCondition) - sqls, err := getResultSqls(o.Db, columns) - if err != nil { - log.Printf("search column definition error:%s\n", err) - return nil, err - } - if len(sqls) == 0 { + tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableName) + if o.IsCaseSensitive { + database = strings.ToLower(database) + schema = strings.ToLower(schema) + tableName = strings.ToLower(tableName) + } + columnsCondition := fmt.Sprintf("table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'", + database, schema, tableName) + if o.IsCaseSensitive { + columnsCondition = fmt.Sprintf("lower(table_catalog) = '%s' AND lower(table_schema) = '%s' "+ + "AND lower(table_name) = '%s'", database, schema, tableName) + } + // 获取列定义,多个英文逗号分割 + columns := fmt.Sprintf("SELECT string_agg(column_name || ' ' || "+ + "CASE "+ + " WHEN data_type IN ('char', 'varchar', 'character', 'character varying', 'text') "+ + " THEN data_type || '(' || COALESCE(character_maximum_length, 0) || ')' "+ + " WHEN data_type IN ('numeric', 'decimal') "+ + " THEN data_type || '(' || COALESCE(numeric_precision, 0) || ',' || COALESCE(numeric_scale, 0) || ')' "+ + " WHEN data_type IN ('integer', 'smallint', 'bigint') THEN data_type "+ + " ELSE data_type "+ + " END "+ + " || "+ + " CASE "+ + " WHEN column_default != '' THEN ' DEFAULT ' || column_default ELSE '' END "+ + " || "+ + " CASE "+ + " WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END, ',\n ' ORDER BY ordinal_position) AS columns_sql"+ + " FROM information_schema.columns "+ + " WHERE %s GROUP BY table_name", columnsCondition) + sqls, err := getResultSqls(o.Db, columns) + if err != nil { + log.Printf("search column definition error:%s\n", err) + return nil, err + } + if len(sqls) == 0 { + return tables, nil + } + tableDDl += strings.Join(sqls, "") + constraintsCondition := fmt.Sprintf("n.nspname = '%s' AND C.relname = '%s'", schema, tableName) + if o.IsCaseSensitive { + constraintsCondition = fmt.Sprintf("lower(n.nspname) = '%s' "+ + "AND lower(C.relname) = '%s'", schema, tableName) + } + // 获取所有约束 + constraints := fmt.Sprintf("SELECT 'CONSTRAINT ' || r.conname || ' ' || "+ + " pg_catalog.pg_get_constraintdef ( r.OID, TRUE ) AS constraint_definition "+ + " FROM pg_catalog.pg_constraint r "+ + " JOIN pg_catalog.pg_class C ON C.OID = r.conrelid "+ + " JOIN pg_catalog.pg_namespace n ON n.OID = C.relnamespace "+ + " WHERE %s", constraintsCondition) + sqls, err = getResultSqls(o.Db, constraints) + if err != nil { + log.Printf("search constraint definition error:%s\n", err) + return nil, err + } + for _, sqlContext := range sqls { + tableDDl += ",\n" + sqlContext + } + tableDDl += ")" + indexesCondition := fmt.Sprintf("schemaname = '%s' and tablename = '%s' ", schema, tableName) + if o.IsCaseSensitive { + indexesCondition = fmt.Sprintf("lower(schemaname) = '%s' and lower(tablename) = '%s'", + schema, tableName) + } + // 获取索引 + indexes := fmt.Sprintf("SELECT indexdef AS index_definition FROM pg_indexes "+ + " WHERE %s", indexesCondition) + sqls, err = getResultSqls(o.Db, indexes) + if err != nil { + log.Printf("search index definition error:%s\n", err) + return nil, err + } + for _, sqlContent := range sqls { + if strings.Contains(sqlContent, "CREATE UNIQUE INDEX") { continue } - tableDDl += strings.Join(sqls, "") - constraintsCondition := fmt.Sprintf("d.datname = '%s' AND n.nspname = '%s' AND C.relname = '%s'", - database, schema, tableName) - if o.IsCaseSensitive { - constraintsCondition = fmt.Sprintf("lower(d.datname) = '%s' AND lower(n.nspname) = '%s' "+ - "AND lower(C.relname) = '%s'", database, schema, tableName) - } - // 获取所有约束 - constraints := fmt.Sprintf("SELECT 'CONSTRAINT ' || r.conname || ' ' || "+ - " pg_catalog.pg_get_constraintdef ( r.OID, TRUE ) AS constraint_definition "+ - " FROM pg_catalog.pg_constraint r "+ - " JOIN pg_catalog.pg_class C ON C.OID = r.conrelid "+ - " JOIN pg_catalog.pg_namespace n ON n.OID = C.relnamespace "+ - " JOIN pg_catalog.pg_database d ON d.datname = n.nspname "+ - " WHERE %s", constraintsCondition) - sqls, err = getResultSqls(o.Db, constraints) - if err != nil { - log.Printf("search constraint definition error:%s\n", err) - return nil, err - } - for _, sqlContext := range sqls { - tableDDl += ",\n" + sqlContext - } - tableDDl += ")" - indexesCondition := fmt.Sprintf("schemaname = '%s' and tablename = '%s' ", schema, tableName) - if o.IsCaseSensitive { - indexesCondition = fmt.Sprintf("lower(schemaname) = '%s' and lower(tablename) = '%s'", - schema, tableName) - } - // 获取索引 - indexes := fmt.Sprintf("SELECT indexdef AS index_definition FROM pg_indexes "+ - " WHERE %s", indexesCondition) - sqls, err = getResultSqls(o.Db, indexes) - if err != nil { - log.Printf("search index definition error:%s\n", err) - return nil, err - } - for _, sqlContent := range sqls { - if strings.Contains(sqlContent, "CREATE UNIQUE INDEX") { - continue - } - tableDDl += ";\n" + sqlContent - } - tables = append(tables, tableDDl) + tableDDl += ";\n" + sqlContent } + tables = append(tables, tableDDl) return tables, nil } -func (o *DB) ShowCreateViews(database, tableName string) ([]string, error) { +func (o *DB) ShowCreateViews(database, schema, tableName string) ([]string, error) { query := fmt.Sprintf( "SELECT 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition"+ " AS create_view_statement "+ - " FROM information_schema.views WHERE table_catalog = '%s' AND table_name = '%s'", - database, tableName) + " FROM information_schema.views "+ + " WHERE table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'", + database, schema, tableName) if o.IsCaseSensitive { database = strings.ToLower(database) @@ -206,8 +203,9 @@ func (o *DB) ShowCreateViews(database, tableName string) ([]string, error) { query = fmt.Sprintf( "SELECT 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition"+ " AS create_view_statement "+ - " FROM information_schema.views WHERE lower(table_catalog) = '%s' AND lower(table_name) = '%s'", - database, tableName) + " FROM information_schema.views "+ + " WHERE lower(table_catalog) = '%s' AND lower(table_schema) = '%s' AND lower(table_name) = '%s'", + database, schema, tableName) } return getResultSqls(o.Db, query) } diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go index 7b053290ec..a9e02ecbeb 100644 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -1595,20 +1595,6 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { defer db.Close() db.IsCaseSensitive = db.GetCaseSensitive() - tables, err := db.ShowSchemaTables(at.ap.InstanceDatabase) - if err != nil { - at.logger.Errorf("get schema table fail, error: %s", err) - return - } - var views []string - if at.ap.Params.GetParam("collect_view").Bool() { - views, err = db.ShowSchemaViews(at.ap.InstanceDatabase) - if err != nil { - at.logger.Errorf("get schema view fail, error: %s", err) - return - } - } - schemas, err := db.GetAllUserSchemas() if err != nil { at.logger.Errorf("get database=%s schemas error: %s", at.ap.InstanceDatabase, err) @@ -1619,23 +1605,69 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { return } - sqls := make([]string, 0, len(tables)+len(views)) - for _, table := range tables { - tableSqls, err := db.ShowCreateTables(at.ap.InstanceDatabase, table, schemas) - if err != nil { - at.logger.Errorf("show create table fail, error: %s", err) - return - } - sqls = append(sqls, tableSqls...) + wg := sync.WaitGroup{} + wg.Add(len(schemas) * 2) + tableMutex := sync.Mutex{} + viewMutex := sync.Mutex{} + sqls := make([]string, 0) + finalTableSqls := make([]string, 0) + finalViewSqls := make([]string, 0) + for _, schema := range schemas { + go func(schema string) { + defer wg.Done() + tables, err := db.ShowSchemaTables(schema) + if err != nil { + at.logger.Errorf("get schema table fail, error: %s", err) + return + } + for _, table := range tables { + tableSqls, err := db.ShowCreateTables(at.ap.InstanceDatabase, schema, table) + if err != nil { + at.logger.Errorf("show create table fail, error: %s", err) + return + } + tableMutex.Lock() + if len(tableSqls) > 0 { + finalTableSqls = append(finalTableSqls, tableSqls...) + } + tableMutex.Unlock() + } + }(schema) + + go func(schema string) { + defer wg.Done() + var views []string + if at.ap.Params.GetParam("collect_view").Bool() { + views, err = db.ShowSchemaViews(schema) + if err != nil { + at.logger.Errorf("get schema view fail, error: %s", err) + return + } + } + for _, view := range views { + viewSqls, err := db.ShowCreateViews(at.ap.InstanceDatabase, schema, view) + if err != nil { + at.logger.Errorf("show create view fail, error: %s", err) + return + } + viewMutex.Lock() + if len(viewSqls) > 0 { + finalViewSqls = append(finalViewSqls, viewSqls...) + } + viewMutex.Unlock() + } + }(schema) } - for _, view := range views { - viewSqls, err := db.ShowCreateViews(at.ap.InstanceDatabase, view) - if err != nil { - at.logger.Errorf("show create view fail, error: %s", err) - return - } - sqls = append(sqls, viewSqls...) + wg.Wait() + + if len(finalTableSqls) > 0 { + sqls = append(sqls, finalTableSqls...) } + + if len(finalViewSqls) > 0 { + sqls = append(sqls, finalViewSqls...) + } + if len(sqls) > 0 { err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, convertRawSQLToModelSQLs(sqls)) if err != nil { From 4c8e54c6f96f1edbeeed22de8b6d09b973262ce7 Mon Sep 17 00:00:00 2001 From: zhsy Date: Sat, 16 Mar 2024 22:22:06 +0800 Subject: [PATCH 03/15] =?UTF-8?q?=E4=BF=AE=E6=94=B9pg=E5=BA=93=E8=A1=A8?= =?UTF-8?q?=E5=85=83=E6=95=B0=E6=8D=AE=E7=9A=84pg=E7=9A=84=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E8=BF=9E=E6=8E=A5=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/pkg/postgresql/db.go | 237 --------------------------- sqle/server/auditplan/task.go | 294 +++++++++++++++++++++++++++------- 2 files changed, 233 insertions(+), 298 deletions(-) delete mode 100644 sqle/pkg/postgresql/db.go diff --git a/sqle/pkg/postgresql/db.go b/sqle/pkg/postgresql/db.go deleted file mode 100644 index b43419414d..0000000000 --- a/sqle/pkg/postgresql/db.go +++ /dev/null @@ -1,237 +0,0 @@ -package postgresql - -import "C" -import ( - "database/sql" - "fmt" - _ "github.com/lib/pq" - "log" - "strings" -) - -type DSN struct { - Host string - Port string - User string - Password string - Database string -} - -func (d *DSN) String() string { - return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", - d.User, d.Password, d.Host, d.Port, d.Database) -} - -type DB struct { - Db *sql.DB - IsCaseSensitive bool -} - -func NewDB(dsn *DSN) (*DB, error) { - // 创建一个数据库连接池 - db, err := sql.Open("postgres", dsn.String()) - if err != nil { - return nil, err - } - - // 设置连接池的最大连接数和空闲连接数 - db.SetMaxOpenConns(100) // 设置最大连接数 - db.SetMaxIdleConns(10) // 设置空闲连接数 - - // 检查数据库连接 - err = db.Ping() - if err != nil { - return nil, err - } - - return &DB{Db: db}, nil -} - -func (o *DB) Close() error { - return o.Db.Close() -} - -func (o *DB) GetCaseSensitive() bool { - var isCaseSensitive bool - query := "SELECT setting FROM pg_settings WHERE name = 'quote_all_identifiers'" - - sqls, err := getResultSqls(o.Db, query) - if err != nil { - return false - } - if len(sqls) == 0 { - return false - } - for _, sqlContent := range sqls { - if strings.ToLower(sqlContent) == "on" { - return true - } - } - return isCaseSensitive -} - -func (o *DB) GetAllUserSchemas() ([]string, error) { - query := "SELECT nspname FROM pg_namespace WHERE nspname NOT LIKE 'pg_%' AND nspname != 'information_schema'" - - sqls, err := getResultSqls(o.Db, query) - if err != nil { - return nil, err - } - return sqls, nil -} - -func (o *DB) ShowSchemaTables(schema string) ([]string, error) { - query := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ - " where table_schema='%s' and TABLE_TYPE in ('BASE TABLE','SYSTEM VIEW')", schema) - - if o.IsCaseSensitive { - schema = strings.ToLower(schema) - query = fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ - " where lower(table_schema)='%s' and TABLE_TYPE in ('BASE TABLE','SYSTEM VIEW')", schema) - } - return getResultSqls(o.Db, query) -} - -func (o *DB) ShowSchemaViews(schema string) ([]string, error) { - query := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ - " where table_schema='%s' and TABLE_TYPE='VIEW'", schema) - - if o.IsCaseSensitive { - schema = strings.ToLower(schema) - query = fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ - "where lower(table_schema)='%s' and TABLE_TYPE='VIEW'", schema) - } - return getResultSqls(o.Db, query) -} - -func (o *DB) ShowCreateTables(database, schema, tableName string) ([]string, error) { - tables := make([]string, 0) - tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableName) - if o.IsCaseSensitive { - database = strings.ToLower(database) - schema = strings.ToLower(schema) - tableName = strings.ToLower(tableName) - } - columnsCondition := fmt.Sprintf("table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'", - database, schema, tableName) - if o.IsCaseSensitive { - columnsCondition = fmt.Sprintf("lower(table_catalog) = '%s' AND lower(table_schema) = '%s' "+ - "AND lower(table_name) = '%s'", database, schema, tableName) - } - // 获取列定义,多个英文逗号分割 - columns := fmt.Sprintf("SELECT string_agg(column_name || ' ' || "+ - "CASE "+ - " WHEN data_type IN ('char', 'varchar', 'character', 'character varying', 'text') "+ - " THEN data_type || '(' || COALESCE(character_maximum_length, 0) || ')' "+ - " WHEN data_type IN ('numeric', 'decimal') "+ - " THEN data_type || '(' || COALESCE(numeric_precision, 0) || ',' || COALESCE(numeric_scale, 0) || ')' "+ - " WHEN data_type IN ('integer', 'smallint', 'bigint') THEN data_type "+ - " ELSE data_type "+ - " END "+ - " || "+ - " CASE "+ - " WHEN column_default != '' THEN ' DEFAULT ' || column_default ELSE '' END "+ - " || "+ - " CASE "+ - " WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END, ',\n ' ORDER BY ordinal_position) AS columns_sql"+ - " FROM information_schema.columns "+ - " WHERE %s GROUP BY table_name", columnsCondition) - sqls, err := getResultSqls(o.Db, columns) - if err != nil { - log.Printf("search column definition error:%s\n", err) - return nil, err - } - if len(sqls) == 0 { - return tables, nil - } - tableDDl += strings.Join(sqls, "") - constraintsCondition := fmt.Sprintf("n.nspname = '%s' AND C.relname = '%s'", schema, tableName) - if o.IsCaseSensitive { - constraintsCondition = fmt.Sprintf("lower(n.nspname) = '%s' "+ - "AND lower(C.relname) = '%s'", schema, tableName) - } - // 获取所有约束 - constraints := fmt.Sprintf("SELECT 'CONSTRAINT ' || r.conname || ' ' || "+ - " pg_catalog.pg_get_constraintdef ( r.OID, TRUE ) AS constraint_definition "+ - " FROM pg_catalog.pg_constraint r "+ - " JOIN pg_catalog.pg_class C ON C.OID = r.conrelid "+ - " JOIN pg_catalog.pg_namespace n ON n.OID = C.relnamespace "+ - " WHERE %s", constraintsCondition) - sqls, err = getResultSqls(o.Db, constraints) - if err != nil { - log.Printf("search constraint definition error:%s\n", err) - return nil, err - } - for _, sqlContext := range sqls { - tableDDl += ",\n" + sqlContext - } - tableDDl += ")" - indexesCondition := fmt.Sprintf("schemaname = '%s' and tablename = '%s' ", schema, tableName) - if o.IsCaseSensitive { - indexesCondition = fmt.Sprintf("lower(schemaname) = '%s' and lower(tablename) = '%s'", - schema, tableName) - } - // 获取索引 - indexes := fmt.Sprintf("SELECT indexdef AS index_definition FROM pg_indexes "+ - " WHERE %s", indexesCondition) - sqls, err = getResultSqls(o.Db, indexes) - if err != nil { - log.Printf("search index definition error:%s\n", err) - return nil, err - } - for _, sqlContent := range sqls { - if strings.Contains(sqlContent, "CREATE UNIQUE INDEX") { - continue - } - tableDDl += ";\n" + sqlContent - } - tables = append(tables, tableDDl) - return tables, nil -} - -func (o *DB) ShowCreateViews(database, schema, tableName string) ([]string, error) { - query := fmt.Sprintf( - "SELECT 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition"+ - " AS create_view_statement "+ - " FROM information_schema.views "+ - " WHERE table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'", - database, schema, tableName) - - if o.IsCaseSensitive { - database = strings.ToLower(database) - tableName = strings.ToLower(tableName) - query = fmt.Sprintf( - "SELECT 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition"+ - " AS create_view_statement "+ - " FROM information_schema.views "+ - " WHERE lower(table_catalog) = '%s' AND lower(table_schema) = '%s' AND lower(table_name) = '%s'", - database, schema, tableName) - } - return getResultSqls(o.Db, query) -} - -func getResultSqls(db *sql.DB, query string) ([]string, error) { - rows, err := db.Query(query) - if err != nil { - return nil, err - } - defer func(rows *sql.Rows) { - innerErr := rows.Close() - if innerErr != nil { - log.Printf("Close rows error:%s\n", innerErr) - } - }(rows) - if rows.Err() != nil { - return nil, rows.Err() - } - sqls := make([]string, 0) - for rows.Next() { - var sqlContent string - err = rows.Scan(&sqlContent) - if err != nil { - return nil, err - } - sqls = append(sqls, sqlContent) - } - return sqls, nil -} diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go index a9e02ecbeb..37cfe095f6 100644 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -5,7 +5,7 @@ import ( "context" "encoding/json" "fmt" - "github.com/actiontech/sqle/sqle/pkg/postgresql" + "github.com/actiontech/sqle/sqle/driver" "net/http" "strconv" "strings" @@ -1571,31 +1571,34 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { at.logger.Warnf("instance schema is not configured") return } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) - defer cancel() - instance, _, err := dms.GetInstanceInProjectByName(ctx, string(at.ap.ProjectId), at.ap.InstanceName) + instance, _, err := dms.GetInstanceInProjectByName(context.Background(), string(at.ap.ProjectId), at.ap.InstanceName) if err != nil { - at.logger.Warnf("get postgreSQL instance error:%s", err) + at.logger.Errorf("get pg instance in project by instanceName failed: %s\n", err) return } - dsn := &postgresql.DSN{ - Host: instance.Host, - Port: instance.Port, - User: instance.User, - Password: instance.Password, - Database: at.ap.InstanceDatabase, + pluginMgr := driver.GetPluginManager() + if !pluginMgr.IsOptionalModuleEnabled(instance.DbType, driverV2.OptionalModuleQuery) { + at.logger.Errorf("collect pg schema meta failed: %v", driver.NewErrPluginAPINotImplement(driverV2.OptionalModuleQuery)) + return } - db, err := postgresql.NewDB(dsn) + plugin, err := pluginMgr.OpenPlugin(at.logger, instance.DbType, &driverV2.Config{DSN: &driverV2.DSN{ + Host: instance.Host, + Port: instance.Port, + User: instance.User, + Password: instance.Password, + AdditionalParams: instance.AdditionalParams, + DatabaseName: at.ap.InstanceDatabase, + }}) if err != nil { - at.logger.Errorf("connect to instance fail, error: %s", err) + at.logger.Errorf("connect to instance fail, error: %v", err) return } - defer db.Close() - db.IsCaseSensitive = db.GetCaseSensitive() - schemas, err := db.GetAllUserSchemas() + // 是否大小写敏感 + isSensitive := at.GetCaseSensitiveForPg(plugin) + + schemas, err := at.GetAllUserSchemas(plugin) if err != nil { at.logger.Errorf("get database=%s schemas error: %s", at.ap.InstanceDatabase, err) return @@ -1605,60 +1608,47 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { return } - wg := sync.WaitGroup{} - wg.Add(len(schemas) * 2) - tableMutex := sync.Mutex{} - viewMutex := sync.Mutex{} sqls := make([]string, 0) finalTableSqls := make([]string, 0) finalViewSqls := make([]string, 0) for _, schema := range schemas { - go func(schema string) { - defer wg.Done() - tables, err := db.ShowSchemaTables(schema) + tables, err := at.ShowSchemaTablesForPg(plugin, schema, isSensitive) + if err != nil { + at.logger.Errorf("get schema table fail, error: %s", err) + continue + } + for _, table := range tables { + tableSqls, err := at.ShowCreateTablesForPg(plugin, at.ap.InstanceDatabase, schema, table, isSensitive) if err != nil { - at.logger.Errorf("get schema table fail, error: %s", err) - return + at.logger.Errorf("show create table fail, error: %s", err) + continue } - for _, table := range tables { - tableSqls, err := db.ShowCreateTables(at.ap.InstanceDatabase, schema, table) - if err != nil { - at.logger.Errorf("show create table fail, error: %s", err) - return - } - tableMutex.Lock() - if len(tableSqls) > 0 { - finalTableSqls = append(finalTableSqls, tableSqls...) - } - tableMutex.Unlock() + if len(tableSqls) > 0 { + finalTableSqls = append(finalTableSqls, tableSqls...) } - }(schema) - - go func(schema string) { - defer wg.Done() - var views []string - if at.ap.Params.GetParam("collect_view").Bool() { - views, err = db.ShowSchemaViews(schema) - if err != nil { - at.logger.Errorf("get schema view fail, error: %s", err) - return - } + } + } + + for _, schema := range schemas { + var views []string + if at.ap.Params.GetParam("collect_view").Bool() { + views, err = at.ShowSchemaViewsForPg(plugin, schema, isSensitive) + if err != nil { + at.logger.Errorf("get schema view fail, error: %s", err) + continue } - for _, view := range views { - viewSqls, err := db.ShowCreateViews(at.ap.InstanceDatabase, schema, view) - if err != nil { - at.logger.Errorf("show create view fail, error: %s", err) - return - } - viewMutex.Lock() - if len(viewSqls) > 0 { - finalViewSqls = append(finalViewSqls, viewSqls...) - } - viewMutex.Unlock() + } + for _, view := range views { + viewSqls, err := at.ShowCreateViewsForPg(plugin, at.ap.InstanceDatabase, schema, view, isSensitive) + if err != nil { + at.logger.Errorf("show create view fail, error: %s", err) + continue + } + if len(viewSqls) > 0 { + finalViewSqls = append(finalViewSqls, viewSqls...) } - }(schema) + } } - wg.Wait() if len(finalTableSqls) > 0 { sqls = append(sqls, finalTableSqls...) @@ -1676,6 +1666,188 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { } } +func (at *PostgreSQLSchemaMetaTask) GetCaseSensitiveForPg(plugin driver.Plugin) bool { + sql := "SELECT setting FROM pg_settings WHERE name = 'quote_all_identifiers'" + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + + result, err := plugin.Query(ctx, sql, &driverV2.QueryConf{TimeOutSecond: 120}) + if err != nil { + at.logger.Errorf("get caseSensitive fail, error: %v", err) + return false + } + rows := result.Rows + for _, row := range rows { + values := row.Values + if len(values) == 0 { + continue + } + caseSensitive := values[0].Value + if strings.ToLower(caseSensitive) == "on" { + return true + } + } + return false +} + +func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin) ([]string, error) { + querySql := "SELECT nspname FROM pg_namespace WHERE nspname NOT LIKE 'pg_%' AND nspname != 'information_schema'" + return at.GetResultSqls(plugin, querySql) +} + +func (at *PostgreSQLSchemaMetaTask) ShowSchemaTablesForPg(plugin driver.Plugin, schema string, isSensitive bool) ([]string, error) { + querySql := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ + " where table_schema='%s' and TABLE_TYPE in ('BASE TABLE','SYSTEM VIEW')", schema) + if isSensitive { + schema = strings.ToLower(schema) + querySql = fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ + " where lower(table_schema)='%s' and TABLE_TYPE in ('BASE TABLE','SYSTEM VIEW')", schema) + } + return at.GetResultSqls(plugin, querySql) +} + +func (at *PostgreSQLSchemaMetaTask) ShowSchemaViewsForPg(plugin driver.Plugin, schema string, isSensitive bool) ([]string, error) { + querySql := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ + " where table_schema='%s' and TABLE_TYPE='VIEW'", schema) + if isSensitive { + schema = strings.ToLower(schema) + querySql = fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ + "where lower(table_schema)='%s' and TABLE_TYPE='VIEW'", schema) + } + return at.GetResultSqls(plugin, querySql) +} + +func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, database, schema, tableName string, isSensitive bool) ([]string, error) { + tables := make([]string, 0) + tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableName) + if isSensitive { + database = strings.ToLower(database) + schema = strings.ToLower(schema) + tableName = strings.ToLower(tableName) + } + columnsCondition := fmt.Sprintf("table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'", + database, schema, tableName) + if isSensitive { + columnsCondition = fmt.Sprintf("lower(table_catalog) = '%s' AND lower(table_schema) = '%s' "+ + "AND lower(table_name) = '%s'", database, schema, tableName) + } + // 获取列定义,多个英文逗号分割 + columns := fmt.Sprintf("SELECT string_agg(column_name || ' ' || "+ + "CASE "+ + " WHEN data_type IN ('char', 'varchar', 'character', 'character varying', 'text') "+ + " THEN data_type || '(' || COALESCE(character_maximum_length, 0) || ')' "+ + " WHEN data_type IN ('numeric', 'decimal') "+ + " THEN data_type || '(' || COALESCE(numeric_precision, 0) || ',' || COALESCE(numeric_scale, 0) || ')' "+ + " WHEN data_type IN ('integer', 'smallint', 'bigint') THEN data_type "+ + " ELSE data_type "+ + " END "+ + " || "+ + " CASE "+ + " WHEN column_default != '' THEN ' DEFAULT ' || column_default ELSE '' END "+ + " || "+ + " CASE "+ + " WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END, ',\n ' ORDER BY ordinal_position) AS columns_sql"+ + " FROM information_schema.columns "+ + " WHERE %s GROUP BY table_name", columnsCondition) + sqls, err := at.GetResultSqls(plugin, columns) + if err != nil { + at.logger.Errorf("search column definition error:%s\n", err) + return nil, err + } + if len(sqls) == 0 { + return tables, nil + } + tableDDl += strings.Join(sqls, "") + constraintsCondition := fmt.Sprintf("n.nspname = '%s' AND C.relname = '%s'", schema, tableName) + if isSensitive { + constraintsCondition = fmt.Sprintf("lower(n.nspname) = '%s' "+ + "AND lower(C.relname) = '%s'", schema, tableName) + } + // 获取所有约束 + constraints := fmt.Sprintf("SELECT 'CONSTRAINT ' || r.conname || ' ' || "+ + " pg_catalog.pg_get_constraintdef ( r.OID, TRUE ) AS constraint_definition "+ + " FROM pg_catalog.pg_constraint r "+ + " JOIN pg_catalog.pg_class C ON C.OID = r.conrelid "+ + " JOIN pg_catalog.pg_namespace n ON n.OID = C.relnamespace "+ + " WHERE %s", constraintsCondition) + sqls, err = at.GetResultSqls(plugin, constraints) + if err != nil { + at.logger.Errorf("search constraint definition error:%s\n", err) + return nil, err + } + for _, sqlContext := range sqls { + tableDDl += ",\n" + sqlContext + } + tableDDl += ")" + indexesCondition := fmt.Sprintf("schemaname = '%s' and tablename = '%s' ", schema, tableName) + if isSensitive { + indexesCondition = fmt.Sprintf("lower(schemaname) = '%s' and lower(tablename) = '%s'", + schema, tableName) + } + // 获取索引 + indexes := fmt.Sprintf("SELECT indexdef AS index_definition FROM pg_indexes "+ + " WHERE %s", indexesCondition) + sqls, err = at.GetResultSqls(plugin, indexes) + if err != nil { + at.logger.Errorf("search index definition error:%s\n", err) + return nil, err + } + for _, sqlContent := range sqls { + if strings.Contains(sqlContent, "CREATE UNIQUE INDEX") { + continue + } + tableDDl += ";\n" + sqlContent + } + tables = append(tables, tableDDl) + return tables, nil +} + +func (at *PostgreSQLSchemaMetaTask) ShowCreateViewsForPg(plugin driver.Plugin, database, schema, tableName string, isSensitive bool) ([]string, error) { + querySql := fmt.Sprintf( + "SELECT 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition"+ + " AS create_view_statement "+ + " FROM information_schema.views "+ + " WHERE table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'", + database, schema, tableName) + if isSensitive { + database = strings.ToLower(database) + tableName = strings.ToLower(tableName) + querySql = fmt.Sprintf( + "SELECT 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition"+ + " AS create_view_statement "+ + " FROM information_schema.views "+ + " WHERE lower(table_catalog) = '%s' AND lower(table_schema) = '%s' AND lower(table_name) = '%s'", + database, schema, tableName) + } + return at.GetResultSqls(plugin, querySql) +} + +func (at *PostgreSQLSchemaMetaTask) GetResultSqls(plugin driver.Plugin, sql string) ([]string, error) { + var ret []string + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + + result, err := plugin.Query(ctx, sql, &driverV2.QueryConf{TimeOutSecond: 120}) + if err != nil { + at.logger.Errorf("plugin.Query() failed:%s\n", err) + return nil, err + } + rows := result.Rows + for _, row := range rows { + values := row.Values + if len(values) == 0 { + continue + } + sqlContent := values[0].Value + if len(sqlContent) == 0 { + continue + } + ret = append(ret, sqlContent) + } + return ret, nil +} + func (at *PostgreSQLSchemaMetaTask) Audit() (*AuditResultResp, error) { task, err := getTaskWithInstanceByAuditPlan(at.ap, at.persist) if err != nil { From 3bc076edd5a2c6a1a204ca45b3ac0eff385342ac Mon Sep 17 00:00:00 2001 From: zhsy Date: Sat, 16 Mar 2024 22:57:11 +0800 Subject: [PATCH 04/15] =?UTF-8?q?=E4=BF=AE=E6=94=B9text=E7=B1=BB=E5=9E=8B?= =?UTF-8?q?=E7=9A=84=E6=8B=AC=E5=8F=B7=EF=BC=8Ctext()=E6=94=B9=E4=B8=BAtex?= =?UTF-8?q?t?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/server/auditplan/task.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go index 37cfe095f6..ce4c339d0c 100644 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -1735,11 +1735,11 @@ func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, // 获取列定义,多个英文逗号分割 columns := fmt.Sprintf("SELECT string_agg(column_name || ' ' || "+ "CASE "+ - " WHEN data_type IN ('char', 'varchar', 'character', 'character varying', 'text') "+ + " WHEN data_type IN ('char', 'varchar', 'character', 'character varying') "+ " THEN data_type || '(' || COALESCE(character_maximum_length, 0) || ')' "+ " WHEN data_type IN ('numeric', 'decimal') "+ " THEN data_type || '(' || COALESCE(numeric_precision, 0) || ',' || COALESCE(numeric_scale, 0) || ')' "+ - " WHEN data_type IN ('integer', 'smallint', 'bigint') THEN data_type "+ + " WHEN data_type IN ('integer', 'smallint', 'bigint', 'text') THEN data_type "+ " ELSE data_type "+ " END "+ " || "+ From 3e2014aca86dbbe73a6a2b1addec683f9bd8408a Mon Sep 17 00:00:00 2001 From: zhangsuyun Date: Sat, 16 Mar 2024 23:21:27 +0800 Subject: [PATCH 05/15] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E7=B1=BB=E5=9E=8B?= =?UTF-8?q?=E7=9A=84=E5=88=A4=E6=96=AD=EF=BC=8Cdata=5Ftype=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E4=B8=BAlower(data=5Ftype)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/server/auditplan/task.go | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) mode change 100644 => 100755 sqle/server/auditplan/task.go diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go old mode 100644 new mode 100755 index 6e3d23a1a1..13e4e08b72 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -1583,7 +1583,8 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { pluginMgr := driver.GetPluginManager() if !pluginMgr.IsOptionalModuleEnabled(instance.DbType, driverV2.OptionalModuleQuery) { - at.logger.Errorf("collect pg schema meta failed: %v", driver.NewErrPluginAPINotImplement(driverV2.OptionalModuleQuery)) + at.logger.Errorf("collect pg schema meta failed: %v", + driver.NewErrPluginAPINotImplement(driverV2.OptionalModuleQuery)) return } plugin, err := pluginMgr.OpenPlugin(at.logger, instance.DbType, &driverV2.Config{DSN: &driverV2.DSN{ @@ -1612,7 +1613,6 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { return } - sqls := make([]string, 0) finalTableSqls := make([]string, 0) finalViewSqls := make([]string, 0) for _, schema := range schemas { @@ -1631,6 +1631,12 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { finalTableSqls = append(finalTableSqls, tableSqls...) } } + if len(finalTableSqls) > 0 { + err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, convertRawSQLToModelSQLs(finalTableSqls, schema)) + if err != nil { + at.logger.Errorf("save table schema meta to storage fail, error: %s", err) + } + } } for _, schema := range schemas { @@ -1652,20 +1658,11 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { finalViewSqls = append(finalViewSqls, viewSqls...) } } - } - - if len(finalTableSqls) > 0 { - sqls = append(sqls, finalTableSqls...) - } - - if len(finalViewSqls) > 0 { - sqls = append(sqls, finalViewSqls...) - } - - if len(sqls) > 0 { - err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, convertRawSQLToModelSQLs(sqls)) - if err != nil { - at.logger.Errorf("save schema meta to storage fail, error: %s", err) + if len(finalViewSqls) > 0 { + err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, convertRawSQLToModelSQLs(finalViewSqls, schema)) + if err != nil { + at.logger.Errorf("save view schema meta to storage fail, error: %s", err) + } } } } @@ -1739,11 +1736,11 @@ func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, // 获取列定义,多个英文逗号分割 columns := fmt.Sprintf("SELECT string_agg(column_name || ' ' || "+ "CASE "+ - " WHEN data_type IN ('char', 'varchar', 'character', 'character varying') "+ + " WHEN lower(data_type) IN ('char', 'varchar', 'character', 'character varying') "+ " THEN data_type || '(' || COALESCE(character_maximum_length, 0) || ')' "+ - " WHEN data_type IN ('numeric', 'decimal') "+ + " WHEN lower(data_type) IN ('numeric', 'decimal') "+ " THEN data_type || '(' || COALESCE(numeric_precision, 0) || ',' || COALESCE(numeric_scale, 0) || ')' "+ - " WHEN data_type IN ('integer', 'smallint', 'bigint', 'text') THEN data_type "+ + " WHEN lower(data_type) IN ('integer', 'smallint', 'bigint', 'text') THEN data_type "+ " ELSE data_type "+ " END "+ " || "+ From 63911ebdb64de879fd5d5f40f93a494a22d62f5a Mon Sep 17 00:00:00 2001 From: zhangsuyun2022 <107415572+zhangsuyun2022@users.noreply.github.com> Date: Fri, 22 Mar 2024 13:57:26 +0800 Subject: [PATCH 06/15] Update task.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修改top sql的大小写判断 --- sqle/server/auditplan/task.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go index 13e4e08b72..c95c847784 100755 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -1700,7 +1700,7 @@ func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin) ([]s func (at *PostgreSQLSchemaMetaTask) ShowSchemaTablesForPg(plugin driver.Plugin, schema string, isSensitive bool) ([]string, error) { querySql := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ " where table_schema='%s' and TABLE_TYPE in ('BASE TABLE','SYSTEM VIEW')", schema) - if isSensitive { + if !isSensitive { schema = strings.ToLower(schema) querySql = fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ " where lower(table_schema)='%s' and TABLE_TYPE in ('BASE TABLE','SYSTEM VIEW')", schema) @@ -1711,7 +1711,7 @@ func (at *PostgreSQLSchemaMetaTask) ShowSchemaTablesForPg(plugin driver.Plugin, func (at *PostgreSQLSchemaMetaTask) ShowSchemaViewsForPg(plugin driver.Plugin, schema string, isSensitive bool) ([]string, error) { querySql := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ " where table_schema='%s' and TABLE_TYPE='VIEW'", schema) - if isSensitive { + if !isSensitive { schema = strings.ToLower(schema) querySql = fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ "where lower(table_schema)='%s' and TABLE_TYPE='VIEW'", schema) @@ -1722,14 +1722,14 @@ func (at *PostgreSQLSchemaMetaTask) ShowSchemaViewsForPg(plugin driver.Plugin, s func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, database, schema, tableName string, isSensitive bool) ([]string, error) { tables := make([]string, 0) tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableName) - if isSensitive { + if !isSensitive { database = strings.ToLower(database) schema = strings.ToLower(schema) tableName = strings.ToLower(tableName) } columnsCondition := fmt.Sprintf("table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'", database, schema, tableName) - if isSensitive { + if !isSensitive { columnsCondition = fmt.Sprintf("lower(table_catalog) = '%s' AND lower(table_schema) = '%s' "+ "AND lower(table_name) = '%s'", database, schema, tableName) } @@ -1761,7 +1761,7 @@ func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, } tableDDl += strings.Join(sqls, "") constraintsCondition := fmt.Sprintf("n.nspname = '%s' AND C.relname = '%s'", schema, tableName) - if isSensitive { + if !isSensitive { constraintsCondition = fmt.Sprintf("lower(n.nspname) = '%s' "+ "AND lower(C.relname) = '%s'", schema, tableName) } @@ -1782,7 +1782,7 @@ func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, } tableDDl += ")" indexesCondition := fmt.Sprintf("schemaname = '%s' and tablename = '%s' ", schema, tableName) - if isSensitive { + if !isSensitive { indexesCondition = fmt.Sprintf("lower(schemaname) = '%s' and lower(tablename) = '%s'", schema, tableName) } @@ -1811,7 +1811,7 @@ func (at *PostgreSQLSchemaMetaTask) ShowCreateViewsForPg(plugin driver.Plugin, d " FROM information_schema.views "+ " WHERE table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'", database, schema, tableName) - if isSensitive { + if !isSensitive { database = strings.ToLower(database) tableName = strings.ToLower(tableName) querySql = fmt.Sprintf( From 80bbc4e027852baf04fc3392f5a332321b305123 Mon Sep 17 00:00:00 2001 From: zhangsuyun2022 <107415572+zhangsuyun2022@users.noreply.github.com> Date: Fri, 22 Mar 2024 17:43:04 +0800 Subject: [PATCH 07/15] Update task.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 去掉大小写敏感的判断和代码实现 --- sqle/server/auditplan/task.go | 81 ++++------------------------------- 1 file changed, 8 insertions(+), 73 deletions(-) diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go index c95c847784..d1a44ed7b0 100755 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -1600,9 +1600,6 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { return } - // 是否大小写敏感 - isSensitive := at.GetCaseSensitiveForPg(plugin) - schemas, err := at.GetAllUserSchemas(plugin) if err != nil { at.logger.Errorf("get database=%s schemas error: %s", at.ap.InstanceDatabase, err) @@ -1616,13 +1613,13 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { finalTableSqls := make([]string, 0) finalViewSqls := make([]string, 0) for _, schema := range schemas { - tables, err := at.ShowSchemaTablesForPg(plugin, schema, isSensitive) + tables, err := at.ShowSchemaTablesForPg(plugin, schema) if err != nil { at.logger.Errorf("get schema table fail, error: %s", err) continue } for _, table := range tables { - tableSqls, err := at.ShowCreateTablesForPg(plugin, at.ap.InstanceDatabase, schema, table, isSensitive) + tableSqls, err := at.ShowCreateTablesForPg(plugin, at.ap.InstanceDatabase, schema, table) if err != nil { at.logger.Errorf("show create table fail, error: %s", err) continue @@ -1642,14 +1639,14 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { for _, schema := range schemas { var views []string if at.ap.Params.GetParam("collect_view").Bool() { - views, err = at.ShowSchemaViewsForPg(plugin, schema, isSensitive) + views, err = at.ShowSchemaViewsForPg(plugin, schema) if err != nil { at.logger.Errorf("get schema view fail, error: %s", err) continue } } for _, view := range views { - viewSqls, err := at.ShowCreateViewsForPg(plugin, at.ap.InstanceDatabase, schema, view, isSensitive) + viewSqls, err := at.ShowCreateViewsForPg(plugin, at.ap.InstanceDatabase, schema, view) if err != nil { at.logger.Errorf("show create view fail, error: %s", err) continue @@ -1667,72 +1664,28 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { } } -func (at *PostgreSQLSchemaMetaTask) GetCaseSensitiveForPg(plugin driver.Plugin) bool { - sql := "SELECT setting FROM pg_settings WHERE name = 'quote_all_identifiers'" - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) - defer cancel() - - result, err := plugin.Query(ctx, sql, &driverV2.QueryConf{TimeOutSecond: 120}) - if err != nil { - at.logger.Errorf("get caseSensitive fail, error: %v", err) - return false - } - rows := result.Rows - for _, row := range rows { - values := row.Values - if len(values) == 0 { - continue - } - caseSensitive := values[0].Value - if strings.ToLower(caseSensitive) == "on" { - return true - } - } - return false -} - func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin) ([]string, error) { querySql := "SELECT nspname FROM pg_namespace WHERE nspname NOT LIKE 'pg_%' AND nspname != 'information_schema'" return at.GetResultSqls(plugin, querySql) } -func (at *PostgreSQLSchemaMetaTask) ShowSchemaTablesForPg(plugin driver.Plugin, schema string, isSensitive bool) ([]string, error) { +func (at *PostgreSQLSchemaMetaTask) ShowSchemaTablesForPg(plugin driver.Plugin, schema string) ([]string, error) { querySql := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ " where table_schema='%s' and TABLE_TYPE in ('BASE TABLE','SYSTEM VIEW')", schema) - if !isSensitive { - schema = strings.ToLower(schema) - querySql = fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ - " where lower(table_schema)='%s' and TABLE_TYPE in ('BASE TABLE','SYSTEM VIEW')", schema) - } return at.GetResultSqls(plugin, querySql) } -func (at *PostgreSQLSchemaMetaTask) ShowSchemaViewsForPg(plugin driver.Plugin, schema string, isSensitive bool) ([]string, error) { +func (at *PostgreSQLSchemaMetaTask) ShowSchemaViewsForPg(plugin driver.Plugin, schema string) ([]string, error) { querySql := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ " where table_schema='%s' and TABLE_TYPE='VIEW'", schema) - if !isSensitive { - schema = strings.ToLower(schema) - querySql = fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ - "where lower(table_schema)='%s' and TABLE_TYPE='VIEW'", schema) - } return at.GetResultSqls(plugin, querySql) } -func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, database, schema, tableName string, isSensitive bool) ([]string, error) { +func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, database, schema, tableName string) ([]string, error) { tables := make([]string, 0) tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableName) - if !isSensitive { - database = strings.ToLower(database) - schema = strings.ToLower(schema) - tableName = strings.ToLower(tableName) - } columnsCondition := fmt.Sprintf("table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'", database, schema, tableName) - if !isSensitive { - columnsCondition = fmt.Sprintf("lower(table_catalog) = '%s' AND lower(table_schema) = '%s' "+ - "AND lower(table_name) = '%s'", database, schema, tableName) - } // 获取列定义,多个英文逗号分割 columns := fmt.Sprintf("SELECT string_agg(column_name || ' ' || "+ "CASE "+ @@ -1761,10 +1714,6 @@ func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, } tableDDl += strings.Join(sqls, "") constraintsCondition := fmt.Sprintf("n.nspname = '%s' AND C.relname = '%s'", schema, tableName) - if !isSensitive { - constraintsCondition = fmt.Sprintf("lower(n.nspname) = '%s' "+ - "AND lower(C.relname) = '%s'", schema, tableName) - } // 获取所有约束 constraints := fmt.Sprintf("SELECT 'CONSTRAINT ' || r.conname || ' ' || "+ " pg_catalog.pg_get_constraintdef ( r.OID, TRUE ) AS constraint_definition "+ @@ -1782,10 +1731,6 @@ func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, } tableDDl += ")" indexesCondition := fmt.Sprintf("schemaname = '%s' and tablename = '%s' ", schema, tableName) - if !isSensitive { - indexesCondition = fmt.Sprintf("lower(schemaname) = '%s' and lower(tablename) = '%s'", - schema, tableName) - } // 获取索引 indexes := fmt.Sprintf("SELECT indexdef AS index_definition FROM pg_indexes "+ " WHERE %s", indexesCondition) @@ -1804,23 +1749,13 @@ func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, return tables, nil } -func (at *PostgreSQLSchemaMetaTask) ShowCreateViewsForPg(plugin driver.Plugin, database, schema, tableName string, isSensitive bool) ([]string, error) { +func (at *PostgreSQLSchemaMetaTask) ShowCreateViewsForPg(plugin driver.Plugin, database, schema, tableName string) ([]string, error) { querySql := fmt.Sprintf( "SELECT 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition"+ " AS create_view_statement "+ " FROM information_schema.views "+ " WHERE table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'", database, schema, tableName) - if !isSensitive { - database = strings.ToLower(database) - tableName = strings.ToLower(tableName) - querySql = fmt.Sprintf( - "SELECT 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition"+ - " AS create_view_statement "+ - " FROM information_schema.views "+ - " WHERE lower(table_catalog) = '%s' AND lower(table_schema) = '%s' AND lower(table_name) = '%s'", - database, schema, tableName) - } return at.GetResultSqls(plugin, querySql) } From bad92eee3c6b436688b980df293184347bc1d10a Mon Sep 17 00:00:00 2001 From: zhsy Date: Thu, 11 Apr 2024 00:04:06 +0800 Subject: [PATCH 08/15] =?UTF-8?q?=E9=87=87=E7=94=A8=E6=89=B9=E9=87=8F?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E6=96=B9=E5=BC=8F=E5=AE=9E=E7=8E=B0=E8=8E=B7?= =?UTF-8?q?=E5=8F=96pg=E5=BA=93=E8=A1=A8=E5=85=83=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/server/auditplan/task.go | 283 ++++++++++++++++++++-------------- 1 file changed, 163 insertions(+), 120 deletions(-) diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go index d1a44ed7b0..9476e757d6 100755 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -1572,7 +1572,7 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { return } if at.ap.InstanceDatabase == "" { - at.logger.Warnf("instance schema is not configured") + at.logger.Warnf("instance database is not configured") return } instance, _, err := dms.GetInstanceInProjectByName(context.Background(), string(at.ap.ProjectId), at.ap.InstanceName) @@ -1599,102 +1599,173 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { at.logger.Errorf("connect to instance fail, error: %v", err) return } + defer pluginMgr.Stop() - schemas, err := at.GetAllUserSchemas(plugin) + // 获取所有的数据库及对应的schema + schemas, err := at.GetAllUserSchemas(plugin, at.ap.InstanceDatabase) if err != nil { - at.logger.Errorf("get database=%s schemas error: %s", at.ap.InstanceDatabase, err) + at.logger.Errorf("get databases=%s schemas fail: %s", at.ap.InstanceDatabase, err) return } - if len(schemas) == 0 { - at.logger.Errorf("get database=%s schemas empty error: %s", at.ap.InstanceDatabase, err) - return + + // 获取表和视图 + tablesAndViews, err := at.GetAllTablesAndViewsForPg(plugin, at.ap.InstanceDatabase) + if err != nil { + at.logger.Errorf("get all table and view fail, error: %s", err) } - finalTableSqls := make([]string, 0) - finalViewSqls := make([]string, 0) - for _, schema := range schemas { - tables, err := at.ShowSchemaTablesForPg(plugin, schema) + // 获取列信息 + columnsInfo, err := at.GetAllColumnsInfoForPg(plugin, at.ap.InstanceDatabase) + if err != nil { + at.logger.Errorf("get all columns information fail, error: %s", err) + } + + // 获取约束信息 + constraints, err := at.GetAllConstraintsForPg(plugin) + if err != nil { + at.logger.Errorf("get all constraints fail, error: %s", err) + } + + // 获取索引信息 + indexes, err := at.GetAllIndexesForPg(plugin) + if err != nil { + at.logger.Errorf("get all indexes fail, error: %s", err) + } + + // 是否收集视图sql + isCollectView := false + if at.ap.Params.GetParam("collect_view").Bool() { + isCollectView = true + } + + var viewsSql [][]string + if isCollectView { + // 获取视图创建sql + viewsSql, err = at.GetAllViewsSqlForPg(plugin, at.ap.InstanceDatabase) if err != nil { - at.logger.Errorf("get schema table fail, error: %s", err) - continue - } - for _, table := range tables { - tableSqls, err := at.ShowCreateTablesForPg(plugin, at.ap.InstanceDatabase, schema, table) - if err != nil { - at.logger.Errorf("show create table fail, error: %s", err) - continue - } - if len(tableSqls) > 0 { - finalTableSqls = append(finalTableSqls, tableSqls...) - } - } - if len(finalTableSqls) > 0 { - err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, convertRawSQLToModelSQLs(finalTableSqls, schema)) - if err != nil { - at.logger.Errorf("save table schema meta to storage fail, error: %s", err) - } + at.logger.Errorf("get all views sql fail, error: %s", err) } } for _, schema := range schemas { - var views []string - if at.ap.Params.GetParam("collect_view").Bool() { - views, err = at.ShowSchemaViewsForPg(plugin, schema) - if err != nil { - at.logger.Errorf("get schema view fail, error: %s", err) + createTableSqls := make([]string, 0) + createViewSqls := make([]string, 0) + for _, tableOrView := range tablesAndViews { + if len(tableOrView) < 3 { continue } - } - for _, view := range views { - viewSqls, err := at.ShowCreateViewsForPg(plugin, at.ap.InstanceDatabase, schema, view) - if err != nil { - at.logger.Errorf("show create view fail, error: %s", err) + + if tableOrView[0] != schema[0] { // schema[0]:二维数组第0个元素 continue } - if len(viewSqls) > 0 { - finalViewSqls = append(finalViewSqls, viewSqls...) + dataObjectType := "" + tableOrViewName := tableOrView[1] + if tableOrView[2] == "BASE TABLE" || tableOrView[2] == "SYSTEM VIEW" { // 表 + dataObjectType = "table" + } else if tableOrView[2] == "VIEW" { // 视图 + dataObjectType = "view" + } + if dataObjectType == "table" { + tableDDl := createTableSqlForPg(schema, tableOrViewName, columnsInfo, constraints, indexes) + createTableSqls = append(createTableSqls, tableDDl) + } else if dataObjectType == "view" { + if !isCollectView { + continue + } + // 视图sql + for _, viewSql := range viewsSql { + if len(viewSql) < 3 { + continue + } + schemaName, tableName := viewSql[0], viewSql[1] + if schemaName != schema[0] || tableName != tableOrViewName { + continue + } + createViewSqls = append(createViewSqls, viewSql[2]) + } } } - if len(finalViewSqls) > 0 { - err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, convertRawSQLToModelSQLs(finalViewSqls, schema)) + + finalInsertSqls := make([]string, 0) + if len(createTableSqls) > 0 { + finalInsertSqls = append(finalInsertSqls, createTableSqls...) + } + + if len(createViewSqls) > 0 { + finalInsertSqls = append(finalInsertSqls, createViewSqls...) + } + + if len(finalInsertSqls) > 0 { + err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, convertRawSQLToModelSQLs(finalInsertSqls, schema[0])) if err != nil { - at.logger.Errorf("save view schema meta to storage fail, error: %s", err) + at.logger.Errorf("save table and view schema meta to storage fail, error: %s", err) } } } } -func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin) ([]string, error) { - querySql := "SELECT nspname FROM pg_namespace WHERE nspname NOT LIKE 'pg_%' AND nspname != 'information_schema'" - return at.GetResultSqls(plugin, querySql) +func createTableSqlForPg(schema []string, tableOrViewName string, columnsInfo, constraints, indexes [][]string) string { + tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema[0], tableOrViewName) + // 列信息 + for _, columnInfo := range columnsInfo { + if len(columnInfo) < 3 { + continue + } + schemaName, tableName := columnInfo[0], columnInfo[1] + if schemaName != schema[0] || tableName != tableOrViewName { + continue + } + tableDDl += columnInfo[2] + } + // 约束信息 + for _, constraintInfo := range constraints { + if len(constraintInfo) < 3 { + continue + } + schemaName, tableName := constraintInfo[0], constraintInfo[1] + if schemaName != schema[0] || tableName != tableOrViewName { + continue + } + tableDDl += ",\n" + constraintInfo[2] + } + tableDDl += ")" + // 索引信息 + for _, indexInfo := range indexes { + if len(indexInfo) < 4 { + continue + } + schemaName, tableName := indexInfo[0], indexInfo[1] + if schemaName != schema[0] || tableName != tableOrViewName { + continue + } + tableDDl += ";\n" + indexInfo[3] + } + return tableDDl } -func (at *PostgreSQLSchemaMetaTask) ShowSchemaTablesForPg(plugin driver.Plugin, schema string) ([]string, error) { - querySql := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ - " where table_schema='%s' and TABLE_TYPE in ('BASE TABLE','SYSTEM VIEW')", schema) - return at.GetResultSqls(plugin, querySql) +func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin, database string) ([][]string, error) { + querySql := fmt.Sprintf("SELECT schema_name FROM information_schema.schemata"+ + " WHERE catalog_name = '%s'"+ + " AND schema_name NOT LIKE 'pg_%%' AND schema_name != 'information_schema' ORDER BY schema_name", database) + return at.GetResult(plugin, querySql) } -func (at *PostgreSQLSchemaMetaTask) ShowSchemaViewsForPg(plugin driver.Plugin, schema string) ([]string, error) { - querySql := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+ - " where table_schema='%s' and TABLE_TYPE='VIEW'", schema) - return at.GetResultSqls(plugin, querySql) +func (at *PostgreSQLSchemaMetaTask) GetAllTablesAndViewsForPg(plugin driver.Plugin, database string) ([][]string, error) { + querySql := fmt.Sprintf("select table_schema, table_name, table_type from information_schema.tables "+ + " where table_catalog = '%s' and table_schema not like 'pg_%%' AND table_schema != 'information_schema' "+ + " ORDER BY table_name", database) + return at.GetResult(plugin, querySql) } -func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, database, schema, tableName string) ([]string, error) { - tables := make([]string, 0) - tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableName) - columnsCondition := fmt.Sprintf("table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'", - database, schema, tableName) - // 获取列定义,多个英文逗号分割 - columns := fmt.Sprintf("SELECT string_agg(column_name || ' ' || "+ +func (at *PostgreSQLSchemaMetaTask) GetAllColumnsInfoForPg(plugin driver.Plugin, database string) ([][]string, error) { + columns := fmt.Sprintf("SELECT table_schema, table_name, string_agg(column_name || ' ' || "+ "CASE "+ " WHEN lower(data_type) IN ('char', 'varchar', 'character', 'character varying') "+ " THEN data_type || '(' || COALESCE(character_maximum_length, 0) || ')' "+ " WHEN lower(data_type) IN ('numeric', 'decimal') "+ " THEN data_type || '(' || COALESCE(numeric_precision, 0) || ',' || COALESCE(numeric_scale, 0) || ')' "+ " WHEN lower(data_type) IN ('integer', 'smallint', 'bigint', 'text') THEN data_type "+ - " ELSE data_type "+ + " ELSE udt_name "+ " END "+ " || "+ " CASE "+ @@ -1703,64 +1774,36 @@ func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, " CASE "+ " WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END, ',\n ' ORDER BY ordinal_position) AS columns_sql"+ " FROM information_schema.columns "+ - " WHERE %s GROUP BY table_name", columnsCondition) - sqls, err := at.GetResultSqls(plugin, columns) - if err != nil { - at.logger.Errorf("search column definition error:%s\n", err) - return nil, err - } - if len(sqls) == 0 { - return tables, nil - } - tableDDl += strings.Join(sqls, "") - constraintsCondition := fmt.Sprintf("n.nspname = '%s' AND C.relname = '%s'", schema, tableName) - // 获取所有约束 - constraints := fmt.Sprintf("SELECT 'CONSTRAINT ' || r.conname || ' ' || "+ - " pg_catalog.pg_get_constraintdef ( r.OID, TRUE ) AS constraint_definition "+ - " FROM pg_catalog.pg_constraint r "+ - " JOIN pg_catalog.pg_class C ON C.OID = r.conrelid "+ - " JOIN pg_catalog.pg_namespace n ON n.OID = C.relnamespace "+ - " WHERE %s", constraintsCondition) - sqls, err = at.GetResultSqls(plugin, constraints) - if err != nil { - at.logger.Errorf("search constraint definition error:%s\n", err) - return nil, err - } - for _, sqlContext := range sqls { - tableDDl += ",\n" + sqlContext - } - tableDDl += ")" - indexesCondition := fmt.Sprintf("schemaname = '%s' and tablename = '%s' ", schema, tableName) - // 获取索引 - indexes := fmt.Sprintf("SELECT indexdef AS index_definition FROM pg_indexes "+ - " WHERE %s", indexesCondition) - sqls, err = at.GetResultSqls(plugin, indexes) - if err != nil { - at.logger.Errorf("search index definition error:%s\n", err) - return nil, err - } - for _, sqlContent := range sqls { - if strings.Contains(sqlContent, "CREATE UNIQUE INDEX") { - continue - } - tableDDl += ";\n" + sqlContent - } - tables = append(tables, tableDDl) - return tables, nil + " WHERE table_catalog = '%s' and table_schema not like 'pg_%%' AND table_schema != 'information_schema' "+ + " GROUP BY table_schema, table_name", database) + return at.GetResult(plugin, columns) +} + +func (at *PostgreSQLSchemaMetaTask) GetAllConstraintsForPg(plugin driver.Plugin) ([][]string, error) { + querySql := fmt.Sprintf("SELECT n.nspname as schema_name, c.relname as table_name, " + + " 'CONSTRAINT ' || r.conname || ' ' || pg_catalog.pg_get_constraintdef ( r.OID, TRUE ) AS constraint_definition" + + " FROM pg_catalog.pg_constraint r JOIN pg_catalog.pg_class c ON C.OID = r.conrelid " + + " JOIN pg_catalog.pg_namespace n ON n.OID = c.relnamespace " + + " where n.nspname not like 'pg_%%' and n.nspname != 'information_schema'") + return at.GetResult(plugin, querySql) +} + +func (at *PostgreSQLSchemaMetaTask) GetAllIndexesForPg(plugin driver.Plugin) ([][]string, error) { + querySql := "SELECT schemaname, tablename, indexname, indexdef FROM pg_indexes " + + " where schemaname not like 'pg_%' AND schemaname != 'information_schema'" + return at.GetResult(plugin, querySql) } -func (at *PostgreSQLSchemaMetaTask) ShowCreateViewsForPg(plugin driver.Plugin, database, schema, tableName string) ([]string, error) { - querySql := fmt.Sprintf( - "SELECT 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition"+ - " AS create_view_statement "+ - " FROM information_schema.views "+ - " WHERE table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'", - database, schema, tableName) - return at.GetResultSqls(plugin, querySql) +func (at *PostgreSQLSchemaMetaTask) GetAllViewsSqlForPg(plugin driver.Plugin, database string) ([][]string, error) { + querySql := fmt.Sprintf("SELECT table_schema, table_name, "+ + " 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition "+ + " AS create_view_statement FROM information_schema.views WHERE table_catalog = '%s' "+ + " AND table_schema not like 'pg_%%' AND table_schema != 'information_schema' order by table_name", database) + return at.GetResult(plugin, querySql) } -func (at *PostgreSQLSchemaMetaTask) GetResultSqls(plugin driver.Plugin, sql string) ([]string, error) { - var ret []string +func (at *PostgreSQLSchemaMetaTask) GetResult(plugin driver.Plugin, sql string) ([][]string, error) { + var ret [][]string ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) defer cancel() @@ -1775,11 +1818,11 @@ func (at *PostgreSQLSchemaMetaTask) GetResultSqls(plugin driver.Plugin, sql stri if len(values) == 0 { continue } - sqlContent := values[0].Value - if len(sqlContent) == 0 { - continue + var valueArr []string + for _, value := range values { + valueArr = append(valueArr, value.Value) } - ret = append(ret, sqlContent) + ret = append(ret, valueArr) } return ret, nil } From 69b8dd34c69a4c2e04ec5d46407336ddeb6c85ce Mon Sep 17 00:00:00 2001 From: zhsy Date: Tue, 16 Apr 2024 23:18:06 +0800 Subject: [PATCH 09/15] =?UTF-8?q?=E4=BF=AE=E6=94=B9pg=E5=BA=93=E8=A1=A8?= =?UTF-8?q?=E5=85=83=E6=95=B0=E6=8D=AE=EF=BC=8C=E4=BF=AE=E6=94=B9=E8=BF=94?= =?UTF-8?q?=E5=9B=9E=E5=80=BC=E4=B8=BA=E7=BB=93=E6=9E=84=E4=BD=93=E6=95=B0?= =?UTF-8?q?=E7=BB=84=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/server/auditplan/task.go | 233 +++++++++++++++++++++++++--------- 1 file changed, 174 insertions(+), 59 deletions(-) diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go index 9476e757d6..3a9826e7eb 100755 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -1552,11 +1552,51 @@ func NewBaiduRdsMySQLSlowLogTask(entry *logrus.Entry, ap *model.AuditPlan) Task return b } -// PostgreSQL库表元数据 +// PostgreSQLSchemaMetaTask : PostgreSQL库表元数据 type PostgreSQLSchemaMetaTask struct { *sqlCollector } +type PostgreSQLSchema struct { + schemaName string +} + +type PostgreSQLTablesAndViews struct { + schemaName string + tableName string + tableType string +} + +type PostgreSQLViewInfo struct { + schemaName string + tableName string + viewSql string +} + +type PostgreSQLCreateTableSql struct { + createTableSql string + createIndexSql string +} + +type PostgreSQLTableColumnInfo struct { + schemaName string + tableName string + columnSql string +} + +type PostgreSQLConstraint struct { + schemaName string + tableName string + constraintDefinition string +} + +type PostgreSQLIndex struct { + schemaName string + tableName string + indexName string + indexDefinition string +} + func NewPostgreSQLSchemaMetaTask(entry *logrus.Entry, ap *model.AuditPlan) Task { sqlCollector := newSQLCollector(entry, ap) task := &PostgreSQLSchemaMetaTask{ @@ -1638,7 +1678,7 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { isCollectView = true } - var viewsSql [][]string + var viewsSql []*PostgreSQLViewInfo if isCollectView { // 获取视图创建sql viewsSql, err = at.GetAllViewsSqlForPg(plugin, at.ap.InstanceDatabase) @@ -1647,117 +1687,131 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { } } + auditPlanSQLV2Slice := make([]*model.AuditPlanSQLV2, 0) for _, schema := range schemas { + currentSchema := schema.schemaName createTableSqls := make([]string, 0) createViewSqls := make([]string, 0) for _, tableOrView := range tablesAndViews { - if len(tableOrView) < 3 { - continue - } - - if tableOrView[0] != schema[0] { // schema[0]:二维数组第0个元素 + if tableOrView.schemaName != currentSchema { continue } dataObjectType := "" - tableOrViewName := tableOrView[1] - if tableOrView[2] == "BASE TABLE" || tableOrView[2] == "SYSTEM VIEW" { // 表 + tableOrViewName := tableOrView.tableName + if tableOrView.tableType == "BASE TABLE" || tableOrView.tableType == "SYSTEM VIEW" { // 表 dataObjectType = "table" - } else if tableOrView[2] == "VIEW" { // 视图 + } else if tableOrView.tableType == "VIEW" { // 视图 dataObjectType = "view" } if dataObjectType == "table" { - tableDDl := createTableSqlForPg(schema, tableOrViewName, columnsInfo, constraints, indexes) + createDDL := createTableSqlForPg(currentSchema, tableOrViewName, columnsInfo, constraints, indexes) + tableDDl := fmt.Sprintf("%s;\n%s", createDDL.createTableSql, createDDL.createIndexSql) createTableSqls = append(createTableSqls, tableDDl) } else if dataObjectType == "view" { if !isCollectView { continue } // 视图sql - for _, viewSql := range viewsSql { - if len(viewSql) < 3 { - continue - } - schemaName, tableName := viewSql[0], viewSql[1] - if schemaName != schema[0] || tableName != tableOrViewName { + for _, view := range viewsSql { + schemaName, tableName := view.schemaName, view.tableName + if schemaName != schema.schemaName || tableName != tableOrViewName { continue } - createViewSqls = append(createViewSqls, viewSql[2]) + createViewSqls = append(createViewSqls, view.viewSql) } } } - finalInsertSqls := make([]string, 0) if len(createTableSqls) > 0 { - finalInsertSqls = append(finalInsertSqls, createTableSqls...) + auditPlanSQLV2Slice = append(auditPlanSQLV2Slice, convertRawSQLToModelSQLs(createTableSqls, currentSchema)...) } if len(createViewSqls) > 0 { - finalInsertSqls = append(finalInsertSqls, createViewSqls...) + auditPlanSQLV2Slice = append(auditPlanSQLV2Slice, convertRawSQLToModelSQLs(createViewSqls, currentSchema)...) } + } - if len(finalInsertSqls) > 0 { - err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, convertRawSQLToModelSQLs(finalInsertSqls, schema[0])) - if err != nil { - at.logger.Errorf("save table and view schema meta to storage fail, error: %s", err) - } + if len(auditPlanSQLV2Slice) > 0 { + err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, auditPlanSQLV2Slice) + if err != nil { + at.logger.Errorf("save table and view schema meta to storage fail, error: %s", err) } } } -func createTableSqlForPg(schema []string, tableOrViewName string, columnsInfo, constraints, indexes [][]string) string { - tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema[0], tableOrViewName) +func createTableSqlForPg(schema, tableOrViewName string, columnsInfo []*PostgreSQLTableColumnInfo, constraints []*PostgreSQLConstraint, indexes []*PostgreSQLIndex) *PostgreSQLCreateTableSql { + tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableOrViewName) // 列信息 for _, columnInfo := range columnsInfo { - if len(columnInfo) < 3 { - continue - } - schemaName, tableName := columnInfo[0], columnInfo[1] - if schemaName != schema[0] || tableName != tableOrViewName { + schemaName, tableName := columnInfo.schemaName, columnInfo.tableName + if schemaName != schema || tableName != tableOrViewName { continue } - tableDDl += columnInfo[2] + tableDDl += columnInfo.columnSql } // 约束信息 for _, constraintInfo := range constraints { - if len(constraintInfo) < 3 { + schemaName, tableName := constraintInfo.schemaName, constraintInfo.tableName + if schemaName != schema || tableName != tableOrViewName { continue } - schemaName, tableName := constraintInfo[0], constraintInfo[1] - if schemaName != schema[0] || tableName != tableOrViewName { - continue - } - tableDDl += ",\n" + constraintInfo[2] + tableDDl += ",\n" + constraintInfo.constraintDefinition } tableDDl += ")" + indexDDl := "" // 索引信息 for _, indexInfo := range indexes { - if len(indexInfo) < 4 { - continue - } - schemaName, tableName := indexInfo[0], indexInfo[1] - if schemaName != schema[0] || tableName != tableOrViewName { + schemaName, tableName := indexInfo.schemaName, indexInfo.tableName + if schemaName != schema || tableName != tableOrViewName { continue } - tableDDl += ";\n" + indexInfo[3] + indexDDl += indexInfo.indexDefinition + } + return &PostgreSQLCreateTableSql{ + createTableSql: tableDDl, + createIndexSql: indexDDl, } - return tableDDl } -func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin, database string) ([][]string, error) { +func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin, database string) ([]*PostgreSQLSchema, error) { + result := make([]*PostgreSQLSchema, 0) querySql := fmt.Sprintf("SELECT schema_name FROM information_schema.schemata"+ " WHERE catalog_name = '%s'"+ " AND schema_name NOT LIKE 'pg_%%' AND schema_name != 'information_schema' ORDER BY schema_name", database) - return at.GetResult(plugin, querySql) + res, err := at.GetResult(plugin, querySql) + if err != nil { + return result, err + } + // res是二维数组,例如:[[{"postgres"}][{"test"}]] + for _, value := range res { + result = append(result, &PostgreSQLSchema{value[0]}) + } + return result, nil } -func (at *PostgreSQLSchemaMetaTask) GetAllTablesAndViewsForPg(plugin driver.Plugin, database string) ([][]string, error) { +func (at *PostgreSQLSchemaMetaTask) GetAllTablesAndViewsForPg(plugin driver.Plugin, database string) ([]*PostgreSQLTablesAndViews, error) { querySql := fmt.Sprintf("select table_schema, table_name, table_type from information_schema.tables "+ " where table_catalog = '%s' and table_schema not like 'pg_%%' AND table_schema != 'information_schema' "+ " ORDER BY table_name", database) - return at.GetResult(plugin, querySql) + result := make([]*PostgreSQLTablesAndViews, 0) + ret, err := at.GetResult(plugin, querySql) + if err != nil { + return result, err + } + for _, value := range ret { + if len(value) < 3 { + return result, fmt.Errorf("get tables and views error, column length is not three") + } + result = append(result, &PostgreSQLTablesAndViews{ + schemaName: value[0], + tableName: value[1], + tableType: value[2], + }) + } + return result, nil } -func (at *PostgreSQLSchemaMetaTask) GetAllColumnsInfoForPg(plugin driver.Plugin, database string) ([][]string, error) { +func (at *PostgreSQLSchemaMetaTask) GetAllColumnsInfoForPg(plugin driver.Plugin, database string) ([]*PostgreSQLTableColumnInfo, error) { columns := fmt.Sprintf("SELECT table_schema, table_name, string_agg(column_name || ' ' || "+ "CASE "+ " WHEN lower(data_type) IN ('char', 'varchar', 'character', 'character varying') "+ @@ -1776,30 +1830,91 @@ func (at *PostgreSQLSchemaMetaTask) GetAllColumnsInfoForPg(plugin driver.Plugin, " FROM information_schema.columns "+ " WHERE table_catalog = '%s' and table_schema not like 'pg_%%' AND table_schema != 'information_schema' "+ " GROUP BY table_schema, table_name", database) - return at.GetResult(plugin, columns) + result := make([]*PostgreSQLTableColumnInfo, 0) + ret, err := at.GetResult(plugin, columns) + if err != nil { + return result, err + } + for _, value := range ret { + if len(value) < 3 { + return result, fmt.Errorf("get column info error, column length is not three") + } + result = append(result, &PostgreSQLTableColumnInfo{ + schemaName: value[0], + tableName: value[1], + columnSql: value[2], + }) + } + return result, nil } -func (at *PostgreSQLSchemaMetaTask) GetAllConstraintsForPg(plugin driver.Plugin) ([][]string, error) { +func (at *PostgreSQLSchemaMetaTask) GetAllConstraintsForPg(plugin driver.Plugin) ([]*PostgreSQLConstraint, error) { querySql := fmt.Sprintf("SELECT n.nspname as schema_name, c.relname as table_name, " + " 'CONSTRAINT ' || r.conname || ' ' || pg_catalog.pg_get_constraintdef ( r.OID, TRUE ) AS constraint_definition" + " FROM pg_catalog.pg_constraint r JOIN pg_catalog.pg_class c ON C.OID = r.conrelid " + " JOIN pg_catalog.pg_namespace n ON n.OID = c.relnamespace " + " where n.nspname not like 'pg_%%' and n.nspname != 'information_schema'") - return at.GetResult(plugin, querySql) + result := make([]*PostgreSQLConstraint, 0) + ret, err := at.GetResult(plugin, querySql) + if err != nil { + return result, err + } + for _, value := range ret { + if len(value) < 3 { + return result, fmt.Errorf("get constraint error, column length is not three") + } + result = append(result, &PostgreSQLConstraint{ + schemaName: value[0], + tableName: value[1], + constraintDefinition: value[2], + }) + } + return result, nil } -func (at *PostgreSQLSchemaMetaTask) GetAllIndexesForPg(plugin driver.Plugin) ([][]string, error) { +func (at *PostgreSQLSchemaMetaTask) GetAllIndexesForPg(plugin driver.Plugin) ([]*PostgreSQLIndex, error) { querySql := "SELECT schemaname, tablename, indexname, indexdef FROM pg_indexes " + " where schemaname not like 'pg_%' AND schemaname != 'information_schema'" - return at.GetResult(plugin, querySql) + result := make([]*PostgreSQLIndex, 0) + ret, err := at.GetResult(plugin, querySql) + if err != nil { + return result, err + } + for _, value := range ret { + if len(value) < 4 { + return result, fmt.Errorf("get index error, column length is not four") + } + result = append(result, &PostgreSQLIndex{ + schemaName: value[0], + tableName: value[1], + indexName: value[2], + indexDefinition: value[3], + }) + } + return result, nil } -func (at *PostgreSQLSchemaMetaTask) GetAllViewsSqlForPg(plugin driver.Plugin, database string) ([][]string, error) { +func (at *PostgreSQLSchemaMetaTask) GetAllViewsSqlForPg(plugin driver.Plugin, database string) ([]*PostgreSQLViewInfo, error) { querySql := fmt.Sprintf("SELECT table_schema, table_name, "+ " 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition "+ " AS create_view_statement FROM information_schema.views WHERE table_catalog = '%s' "+ " AND table_schema not like 'pg_%%' AND table_schema != 'information_schema' order by table_name", database) - return at.GetResult(plugin, querySql) + result := make([]*PostgreSQLViewInfo, 0) + ret, err := at.GetResult(plugin, querySql) + if err != nil { + return result, err + } + for _, value := range ret { + if len(value) < 3 { + return result, fmt.Errorf("get view sql error, column length is not three") + } + result = append(result, &PostgreSQLViewInfo{ + schemaName: value[0], + tableName: value[1], + viewSql: value[2], + }) + } + return result, nil } func (at *PostgreSQLSchemaMetaTask) GetResult(plugin driver.Plugin, sql string) ([][]string, error) { From a53e5e971aacefc81d5b672bde6609bc6e012263 Mon Sep 17 00:00:00 2001 From: zhsy Date: Mon, 22 Apr 2024 18:31:18 +0800 Subject: [PATCH 10/15] =?UTF-8?q?=E4=BA=8C=E7=BB=B4=E6=95=B0=E7=BB=84?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=A9=BA=E5=88=A4=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/server/auditplan/task.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go index 3a9826e7eb..b9b8275cf6 100755 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -1782,10 +1782,15 @@ func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin, data if err != nil { return result, err } - // res是二维数组,例如:[[{"postgres"}][{"test"}]] for _, value := range res { + if len(value) == 0 { + continue + } result = append(result, &PostgreSQLSchema{value[0]}) } + if len(result) == 0 { + return result, fmt.Errorf("database=%s has no schema", database) + } return result, nil } From 8a49c4ebb020c9cc9da77f8334ba676f6aca3f26 Mon Sep 17 00:00:00 2001 From: zhsy2 Date: Thu, 27 Jun 2024 23:57:45 +0800 Subject: [PATCH 11/15] =?UTF-8?q?=E4=BF=AE=E6=94=B9sql=E7=9A=84=E5=86=99?= =?UTF-8?q?=E6=B3=95=EF=BC=8C=E6=94=B9||=E4=B8=BAconcat=20=E6=94=B9+?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E4=B8=BAconcat=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/server/auditplan/task.go | 86 ++++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go index b9b8275cf6..b5534156f7 100755 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -1,5 +1,6 @@ package auditplan +import "C" import ( "bytes" "context" @@ -1740,7 +1741,7 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { } func createTableSqlForPg(schema, tableOrViewName string, columnsInfo []*PostgreSQLTableColumnInfo, constraints []*PostgreSQLConstraint, indexes []*PostgreSQLIndex) *PostgreSQLCreateTableSql { - tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableOrViewName) + tableDDl := fmt.Sprintf(`CREATE TABLE %s.%s(`, schema, tableOrViewName) // 列信息 for _, columnInfo := range columnsInfo { schemaName, tableName := columnInfo.schemaName, columnInfo.tableName @@ -1775,9 +1776,10 @@ func createTableSqlForPg(schema, tableOrViewName string, columnsInfo []*PostgreS func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin, database string) ([]*PostgreSQLSchema, error) { result := make([]*PostgreSQLSchema, 0) - querySql := fmt.Sprintf("SELECT schema_name FROM information_schema.schemata"+ - " WHERE catalog_name = '%s'"+ - " AND schema_name NOT LIKE 'pg_%%' AND schema_name != 'information_schema' ORDER BY schema_name", database) + querySql := fmt.Sprintf(` + SELECT schema_name FROM information_schema.schemata + WHERE catalog_name = '%s' + AND schema_name NOT LIKE 'pg_%' AND schema_name != 'information_schema' ORDER BY schema_name`, database) res, err := at.GetResult(plugin, querySql) if err != nil { return result, err @@ -1795,9 +1797,9 @@ func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin, data } func (at *PostgreSQLSchemaMetaTask) GetAllTablesAndViewsForPg(plugin driver.Plugin, database string) ([]*PostgreSQLTablesAndViews, error) { - querySql := fmt.Sprintf("select table_schema, table_name, table_type from information_schema.tables "+ - " where table_catalog = '%s' and table_schema not like 'pg_%%' AND table_schema != 'information_schema' "+ - " ORDER BY table_name", database) + querySql := fmt.Sprintf(`select table_schema, table_name, table_type from information_schema.tables + where table_catalog = '%s' and table_schema not like 'pg_%' AND table_schema != 'information_schema' + ORDER BY table_name`, database) result := make([]*PostgreSQLTablesAndViews, 0) ret, err := at.GetResult(plugin, querySql) if err != nil { @@ -1817,24 +1819,27 @@ func (at *PostgreSQLSchemaMetaTask) GetAllTablesAndViewsForPg(plugin driver.Plug } func (at *PostgreSQLSchemaMetaTask) GetAllColumnsInfoForPg(plugin driver.Plugin, database string) ([]*PostgreSQLTableColumnInfo, error) { - columns := fmt.Sprintf("SELECT table_schema, table_name, string_agg(column_name || ' ' || "+ - "CASE "+ - " WHEN lower(data_type) IN ('char', 'varchar', 'character', 'character varying') "+ - " THEN data_type || '(' || COALESCE(character_maximum_length, 0) || ')' "+ - " WHEN lower(data_type) IN ('numeric', 'decimal') "+ - " THEN data_type || '(' || COALESCE(numeric_precision, 0) || ',' || COALESCE(numeric_scale, 0) || ')' "+ - " WHEN lower(data_type) IN ('integer', 'smallint', 'bigint', 'text') THEN data_type "+ - " ELSE udt_name "+ - " END "+ - " || "+ - " CASE "+ - " WHEN column_default != '' THEN ' DEFAULT ' || column_default ELSE '' END "+ - " || "+ - " CASE "+ - " WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END, ',\n ' ORDER BY ordinal_position) AS columns_sql"+ - " FROM information_schema.columns "+ - " WHERE table_catalog = '%s' and table_schema not like 'pg_%%' AND table_schema != 'information_schema' "+ - " GROUP BY table_schema, table_name", database) + columns := fmt.Sprintf(`select table_schema, table_name, + string_agg( + concat( + column_name, ' ', + case + when lower(data_type) in ('char', 'varchar', 'character', 'character varying') then concat(data_type, '(', coalesce(character_maximum_length, 0), ')') + when lower(data_type) in ('numeric', 'decimal') then concat(data_type, '(', coalesce(numeric_precision, 0), ',', coalesce(numeric_scale, 0), ')') + when lower(data_type) in ('integer', 'smallint', 'bigint', 'text') then data_type + else udt_name + end, + case + when column_default != '' then concat(' default ', column_default) else '' + end, + case + when is_nullable = 'no' then ' not null' else '' + end + ), ',\n ' order by ordinal_position + ) as columns_sql + from information_schema.columns + where table_catalog = '%s' and table_schema not like 'pg_%' and table_schema != 'information_schema' + group by table_schema, table_name`, database) result := make([]*PostgreSQLTableColumnInfo, 0) ret, err := at.GetResult(plugin, columns) if err != nil { @@ -1854,11 +1859,17 @@ func (at *PostgreSQLSchemaMetaTask) GetAllColumnsInfoForPg(plugin driver.Plugin, } func (at *PostgreSQLSchemaMetaTask) GetAllConstraintsForPg(plugin driver.Plugin) ([]*PostgreSQLConstraint, error) { - querySql := fmt.Sprintf("SELECT n.nspname as schema_name, c.relname as table_name, " + - " 'CONSTRAINT ' || r.conname || ' ' || pg_catalog.pg_get_constraintdef ( r.OID, TRUE ) AS constraint_definition" + - " FROM pg_catalog.pg_constraint r JOIN pg_catalog.pg_class c ON C.OID = r.conrelid " + - " JOIN pg_catalog.pg_namespace n ON n.OID = c.relnamespace " + - " where n.nspname not like 'pg_%%' and n.nspname != 'information_schema'") + querySql := `select + n.nspname as schema_name, + c.relname as table_name, + concat ( 'constraint ', r.conname, ' ', pg_catalog.pg_get_constraintdef ( r.oid, true ) ) as constraint_definition + from + pg_catalog.pg_constraint r + join pg_catalog.pg_class c on c.oid = r.conrelid + join pg_catalog.pg_namespace n on n.oid = c.relnamespace + where + n.nspname not like'pg_%' + and n.nspname != 'information_schema'` result := make([]*PostgreSQLConstraint, 0) ret, err := at.GetResult(plugin, querySql) if err != nil { @@ -1878,8 +1889,8 @@ func (at *PostgreSQLSchemaMetaTask) GetAllConstraintsForPg(plugin driver.Plugin) } func (at *PostgreSQLSchemaMetaTask) GetAllIndexesForPg(plugin driver.Plugin) ([]*PostgreSQLIndex, error) { - querySql := "SELECT schemaname, tablename, indexname, indexdef FROM pg_indexes " + - " where schemaname not like 'pg_%' AND schemaname != 'information_schema'" + querySql := `select schemaname, tablename, indexname, indexdef from pg_indexes + where schemaname not like 'pg_%' and schemaname != 'information_schema'` result := make([]*PostgreSQLIndex, 0) ret, err := at.GetResult(plugin, querySql) if err != nil { @@ -1900,10 +1911,13 @@ func (at *PostgreSQLSchemaMetaTask) GetAllIndexesForPg(plugin driver.Plugin) ([] } func (at *PostgreSQLSchemaMetaTask) GetAllViewsSqlForPg(plugin driver.Plugin, database string) ([]*PostgreSQLViewInfo, error) { - querySql := fmt.Sprintf("SELECT table_schema, table_name, "+ - " 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition "+ - " AS create_view_statement FROM information_schema.views WHERE table_catalog = '%s' "+ - " AND table_schema not like 'pg_%%' AND table_schema != 'information_schema' order by table_name", database) + querySql := fmt.Sprintf(`select table_schema, table_name, + concat('create or replace view ', table_schema, '.', table_name, ' as ', view_definition) as create_view_statement + from information_schema.views + where table_catalog = '%s' + and table_schema not like 'pg_%' + and table_schema != 'information_schema' + order by table_name`, database) result := make([]*PostgreSQLViewInfo, 0) ret, err := at.GetResult(plugin, querySql) if err != nil { From 2e52705fa0e487dbd4e6a114b0b26e1c4d206f25 Mon Sep 17 00:00:00 2001 From: zhsy2 Date: Fri, 28 Jun 2024 13:06:06 +0800 Subject: [PATCH 12/15] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/server/auditplan/task.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go index b5534156f7..1efcdebe2d 100755 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -1780,10 +1780,12 @@ func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin, data SELECT schema_name FROM information_schema.schemata WHERE catalog_name = '%s' AND schema_name NOT LIKE 'pg_%' AND schema_name != 'information_schema' ORDER BY schema_name`, database) + fmt.Printf("GetAllUserSchemas-->querySql:%s\n", querySql) res, err := at.GetResult(plugin, querySql) if err != nil { return result, err } + fmt.Printf("GetAllUserSchemas->error:%+v\n", err) for _, value := range res { if len(value) == 0 { continue From 87195ace521e5685df97078227b5881aaf94635c Mon Sep 17 00:00:00 2001 From: zhsy Date: Fri, 28 Jun 2024 14:10:41 +0800 Subject: [PATCH 13/15] =?UTF-8?q?=E4=BF=AE=E6=94=B9sql=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/server/auditplan/task.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go index 1efcdebe2d..5f3628e637 100755 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -1779,7 +1779,7 @@ func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin, data querySql := fmt.Sprintf(` SELECT schema_name FROM information_schema.schemata WHERE catalog_name = '%s' - AND schema_name NOT LIKE 'pg_%' AND schema_name != 'information_schema' ORDER BY schema_name`, database) + AND schema_name NOT LIKE 'pg_%%' AND schema_name != 'information_schema' ORDER BY schema_name`, database) fmt.Printf("GetAllUserSchemas-->querySql:%s\n", querySql) res, err := at.GetResult(plugin, querySql) if err != nil { @@ -1800,7 +1800,7 @@ func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin, data func (at *PostgreSQLSchemaMetaTask) GetAllTablesAndViewsForPg(plugin driver.Plugin, database string) ([]*PostgreSQLTablesAndViews, error) { querySql := fmt.Sprintf(`select table_schema, table_name, table_type from information_schema.tables - where table_catalog = '%s' and table_schema not like 'pg_%' AND table_schema != 'information_schema' + where table_catalog = '%s' and table_schema not like 'pg_%%' AND table_schema != 'information_schema' ORDER BY table_name`, database) result := make([]*PostgreSQLTablesAndViews, 0) ret, err := at.GetResult(plugin, querySql) @@ -1840,7 +1840,7 @@ func (at *PostgreSQLSchemaMetaTask) GetAllColumnsInfoForPg(plugin driver.Plugin, ), ',\n ' order by ordinal_position ) as columns_sql from information_schema.columns - where table_catalog = '%s' and table_schema not like 'pg_%' and table_schema != 'information_schema' + where table_catalog = '%s' and table_schema not like 'pg_%%' and table_schema != 'information_schema' group by table_schema, table_name`, database) result := make([]*PostgreSQLTableColumnInfo, 0) ret, err := at.GetResult(plugin, columns) @@ -1917,7 +1917,7 @@ func (at *PostgreSQLSchemaMetaTask) GetAllViewsSqlForPg(plugin driver.Plugin, da concat('create or replace view ', table_schema, '.', table_name, ' as ', view_definition) as create_view_statement from information_schema.views where table_catalog = '%s' - and table_schema not like 'pg_%' + and table_schema not like 'pg_%%' and table_schema != 'information_schema' order by table_name`, database) result := make([]*PostgreSQLViewInfo, 0) From 99aaea1a698fc7103204d500c11161f43f2c99ab Mon Sep 17 00:00:00 2001 From: zhsy Date: Fri, 28 Jun 2024 17:12:32 +0800 Subject: [PATCH 14/15] =?UTF-8?q?=E4=BF=AE=E6=94=B9sql=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/server/auditplan/task.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go index 5f3628e637..f223573032 100755 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -1706,7 +1706,7 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { } if dataObjectType == "table" { createDDL := createTableSqlForPg(currentSchema, tableOrViewName, columnsInfo, constraints, indexes) - tableDDl := fmt.Sprintf("%s;\n%s", createDDL.createTableSql, createDDL.createIndexSql) + tableDDl := fmt.Sprintf("%s;%s", createDDL.createTableSql, createDDL.createIndexSql) createTableSqls = append(createTableSqls, tableDDl) } else if dataObjectType == "view" { if !isCollectView { @@ -1741,7 +1741,7 @@ func (at *PostgreSQLSchemaMetaTask) collectorDo() { } func createTableSqlForPg(schema, tableOrViewName string, columnsInfo []*PostgreSQLTableColumnInfo, constraints []*PostgreSQLConstraint, indexes []*PostgreSQLIndex) *PostgreSQLCreateTableSql { - tableDDl := fmt.Sprintf(`CREATE TABLE %s.%s(`, schema, tableOrViewName) + tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableOrViewName) // 列信息 for _, columnInfo := range columnsInfo { schemaName, tableName := columnInfo.schemaName, columnInfo.tableName @@ -1756,7 +1756,7 @@ func createTableSqlForPg(schema, tableOrViewName string, columnsInfo []*PostgreS if schemaName != schema || tableName != tableOrViewName { continue } - tableDDl += ",\n" + constraintInfo.constraintDefinition + tableDDl += fmt.Sprintf(",%s\n", constraintInfo.constraintDefinition) } tableDDl += ")" indexDDl := "" @@ -1766,7 +1766,7 @@ func createTableSqlForPg(schema, tableOrViewName string, columnsInfo []*PostgreS if schemaName != schema || tableName != tableOrViewName { continue } - indexDDl += indexInfo.indexDefinition + indexDDl += fmt.Sprintf("%s;\n", indexInfo.indexDefinition) } return &PostgreSQLCreateTableSql{ createTableSql: tableDDl, @@ -1780,12 +1780,10 @@ func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin, data SELECT schema_name FROM information_schema.schemata WHERE catalog_name = '%s' AND schema_name NOT LIKE 'pg_%%' AND schema_name != 'information_schema' ORDER BY schema_name`, database) - fmt.Printf("GetAllUserSchemas-->querySql:%s\n", querySql) res, err := at.GetResult(plugin, querySql) if err != nil { return result, err } - fmt.Printf("GetAllUserSchemas->error:%+v\n", err) for _, value := range res { if len(value) == 0 { continue @@ -1837,7 +1835,7 @@ func (at *PostgreSQLSchemaMetaTask) GetAllColumnsInfoForPg(plugin driver.Plugin, case when is_nullable = 'no' then ' not null' else '' end - ), ',\n ' order by ordinal_position + ), ', ' order by ordinal_position ) as columns_sql from information_schema.columns where table_catalog = '%s' and table_schema not like 'pg_%%' and table_schema != 'information_schema' From 86f29c2980b50fac23811b363f1c5c219135f13c Mon Sep 17 00:00:00 2001 From: zhsy Date: Mon, 1 Jul 2024 10:50:29 +0800 Subject: [PATCH 15/15] =?UTF-8?q?=E4=BF=AE=E6=94=B9sql=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/server/auditplan/task.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go index f223573032..8bd32b593f 100755 --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -1777,8 +1777,7 @@ func createTableSqlForPg(schema, tableOrViewName string, columnsInfo []*PostgreS func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin, database string) ([]*PostgreSQLSchema, error) { result := make([]*PostgreSQLSchema, 0) querySql := fmt.Sprintf(` - SELECT schema_name FROM information_schema.schemata - WHERE catalog_name = '%s' + SELECT schema_name FROM information_schema.schemata WHERE catalog_name = '%s' AND schema_name NOT LIKE 'pg_%%' AND schema_name != 'information_schema' ORDER BY schema_name`, database) res, err := at.GetResult(plugin, querySql) if err != nil {