From da3d9d9fc6688b7ac010fefed75d1ce1bdecf550 Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Sat, 3 Feb 2024 10:26:33 -0300 Subject: [PATCH] add mariadb [nt] --- connection/connection.go | 5 + database/database.go | 4 +- database/database_mysql.go | 27 ++- database/schemata.go | 2 +- database/templates/mariadb.yaml | 393 ++++++++++++++++++++++++++++++++ dbio_types.go | 8 +- 6 files changed, 432 insertions(+), 7 deletions(-) create mode 100644 database/templates/mariadb.yaml diff --git a/connection/connection.go b/connection/connection.go index 0fcd612..b1e0b12 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -436,6 +436,11 @@ func (c *Connection) setURL() (err error) { setIfMissing("password", "") setIfMissing("port", c.Type.DefPort()) template = "mysql://{username}:{password}@{host}:{port}/{database}" + case dbio.TypeDbMariaDB: + setIfMissing("username", c.Data["user"]) + setIfMissing("password", "") + setIfMissing("port", c.Type.DefPort()) + template = "mariadb://{username}:{password}@{host}:{port}/{database}" case dbio.TypeDbBigQuery: setIfMissing("dataset", c.Data["schema"]) setIfMissing("schema", c.Data["dataset"]) diff --git a/database/database.go b/database/database.go index c0eee45..fd7f65e 100755 --- a/database/database.go +++ b/database/database.go @@ -274,6 +274,8 @@ func NewConnContext(ctx context.Context, URL string, props ...string) (Connectio conn = &MsSQLServerConn{URL: URL} } else if strings.HasPrefix(URL, "mysql:") { conn = &MySQLConn{URL: URL} + } else if strings.HasPrefix(URL, "mariadb:") { + conn = &MySQLConn{URL: URL} } else if strings.HasPrefix(URL, "oracle:") { conn = &OracleConn{URL: URL} // concurrency = 2 @@ -320,7 +322,7 @@ func getDriverName(dbType dbio.Type) (driverName string) { switch dbType { case dbio.TypeDbPostgres, dbio.TypeDbRedshift: driverName = "postgres" - case dbio.TypeDbMySQL: + case dbio.TypeDbMySQL, dbio.TypeDbMariaDB: driverName = "mysql" case dbio.TypeDbOracle: driverName = "godror" diff --git a/database/database_mysql.go b/database/database_mysql.go index 5e6d248..3c596d9 100755 --- a/database/database_mysql.go +++ b/database/database_mysql.go @@ -15,7 +15,7 @@ import ( "github.com/xo/dburl" ) -// MySQLConn is a Postgres connection +// MySQLConn is a MySQL or MariaDB connection type MySQLConn struct { BaseConn URL string @@ -28,6 +28,10 @@ func (conn *MySQLConn) Init() error { conn.BaseConn.Type = dbio.TypeDbMySQL conn.BaseConn.defaultPort = 3306 + if strings.HasPrefix(conn.URL, "maria") { + conn.BaseConn.Type = dbio.TypeDbMariaDB + } + // Turn off Bulk export for now // the LoadDataOutFile needs special circumstances conn.BaseConn.SetProp("allow_bulk_export", "false") @@ -229,11 +233,28 @@ func (conn *MySQLConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFie return } + srcT, err := ParseTableName(srcTable, conn.GetType()) + if err != nil { + err = g.Error(err, "could not generate parse srcTable") + return + } + + tgtT, err := ParseTableName(tgtTable, conn.GetType()) + if err != nil { + err = g.Error(err, "could not generate parse tgtTable") + return + } + + // replace src & tgt to make compatible to MariaDB + // see https://github.com/slingdata-io/sling-cli/issues/135 + upsertMap["src_tgt_pk_equal"] = strings.ReplaceAll(upsertMap["src_tgt_pk_equal"], "src.", srcT.NameQ()+".") + upsertMap["src_tgt_pk_equal"] = strings.ReplaceAll(upsertMap["src_tgt_pk_equal"], "tgt.", tgtT.NameQ()+".") + sqlTemplate := ` - DELETE FROM {tgt_table} tgt + DELETE FROM {tgt_table} WHERE EXISTS ( SELECT 1 - FROM {src_table} src + FROM {src_table} WHERE {src_tgt_pk_equal} ) ; diff --git a/database/schemata.go b/database/schemata.go index 2d81dd7..8cf95ab 100644 --- a/database/schemata.go +++ b/database/schemata.go @@ -434,7 +434,7 @@ func ParseColumnName(text string, dialect dbio.Type) (colName string, err error) func GetQualifierQuote(dialect dbio.Type) string { quote := `"` switch dialect { - case dbio.TypeDbMySQL, dbio.TypeDbBigQuery, dbio.TypeDbClickhouse: + case dbio.TypeDbMySQL, dbio.TypeDbMariaDB, dbio.TypeDbBigQuery, dbio.TypeDbClickhouse: quote = "`" case dbio.TypeDbBigTable: quote = "" diff --git a/database/templates/mariadb.yaml b/database/templates/mariadb.yaml new file mode 100644 index 0000000..d9b6273 --- /dev/null +++ b/database/templates/mariadb.yaml @@ -0,0 +1,393 @@ +core: + drop_table: drop table if exists {table} + drop_view: drop table if exists {view} + create_table: create table if not exists {table} ({col_types}) + create_index: create index {index} on {table} ({cols}) + insert: insert into {table} ({fields}) values ({values}) + update: update {table} set {set_fields} where {pk_fields_equal} + alter_columns: alter table {table} modify {col_ddl} + modify_column: '{column} {type}' + +metadata: + current_database: select database() as name from dual + + databases: select database() as name from dual + + schemas: | + select schema_name + from information_schema.schemata + order by schema_name + + tables: | + select table_name + from information_schema.tables + where table_type = 'BASE TABLE' + and table_schema = '{schema}' + order by table_name + + views: | + select table_name + from information_schema.tables + where table_type = 'VIEW' + and table_schema = '{schema}' + order by table_name + + columns: | + select column_name, data_type + from information_schema.columns + where table_schema = '{schema}' + and table_name = '{table}' + order by ordinal_position + + primary_keys: | + select tco.constraint_name as pk_name, + kcu.ordinal_position as position, + kcu.column_name as column_name + from information_schema.table_constraints tco + join information_schema.key_column_usage kcu + on kcu.constraint_catalog = tco.constraint_catalog + and kcu.constraint_schema = tco.constraint_schema + and kcu.table_schema = tco.table_schema + and kcu.table_name = tco.table_name + where kcu.table_schema = '{schema}' + and kcu.table_name = '{table}' + order by kcu.table_schema, + kcu.table_name, + position + + indexes: | + select + index_name as index_name, + column_name as column_name + from information_schema.statistics + where table_schema = '{schema}' + and table_name = '{table}' + order by + index_name, + seq_in_index + + columns_full: | + with tables as ( + select + table_catalog, + table_schema, + table_name, + case table_type + when 'VIEW' then true + else false + end as is_view + from information_schema.tables + where table_schema = '{schema}' and table_name = '{table}' + ) + select + cols.table_schema as schema_name, + cols.table_name as table_name, + cols.column_name as column_name, + cols.data_type as data_type, + cols.ordinal_position as position + from information_schema.columns cols + join tables + on tables.table_catalog = cols.table_catalog + and tables.table_schema = cols.table_schema + and tables.table_name = cols.table_name + order by cols.table_catalog, cols.table_schema, cols.table_name, cols.ordinal_position + + schemata: | + with tables as ( + select + table_catalog, + table_schema, + table_name, + case table_type + when 'VIEW' then true + else false + end as is_view + from information_schema.tables + where 1=1 + {{if .schema -}} and table_schema = '{schema}' {{- end}} + {{if .tables -}} and table_name in ({tables}) {{- end}} + ) + select + cols.table_schema as schema_name, + cols.table_name as table_name, + tables.is_view as is_view, + cols.column_name as column_name, + cols.data_type as data_type, + cols.ordinal_position as position + from information_schema.columns cols + join tables + on tables.table_catalog = cols.table_catalog + and tables.table_schema = cols.table_schema + and tables.table_name = cols.table_name + order by cols.table_catalog, cols.table_schema, cols.table_name, cols.ordinal_position + + ddl_table: SHOW CREATE TABLE `{schema}`.`{table}` + ddl_view: SHOW CREATE TABLE `{schema}`.`{table}` + +analysis: + # table level + table_count: | + -- table_count {table} + select + '{schema}' as schema_nm, + '{table}' as table_nm, + count(*) cnt + from `{schema}`.`{table}` + + field_chars: | + -- field_chars {table} + select + '{schema}' as schema_nm, + '{table}' as table_nm, + '{field}' as field, + sum(case when regexp_like({field}, '\\n') then 1 else 0 end) as cnt_nline, + sum(case when regexp_like({field}, '\\t') then 1 else 0 end) as cnt_tab, + sum(case when regexp_like({field}, ',') then 1 else 0 end) as cnt_comma, + sum(case when regexp_like({field}, '"') then 1 else 0 end) as cnt_dquote + from `{schema}`.`{table}` + + field_pk_test: | + -- field_pk_test {table} + select + '`{schema}`.`{table}`' as table_nm, + case when count(*) = count(distinct {field}) then 'PASS' else 'FAIL' end as result, + count(*) as tot_cnt, + count(distinct {field}) as dstct_cnt + from `{schema}`.`{table}` + + field_stat: | + -- field_stat {field} + select + '{schema}' as schema_nm, + '{table}' as table_nm, + '{field}' as field, + count(*) as tot_cnt, + count({field}) as f_cnt, + count(*) - count({field}) as f_null_cnt, + round(100.0 * (count(*) - count({field})) / count(*), 1) as f_null_prct, + count(distinct {field}) as f_dstct_cnt, + round(100.0 * count(distinct {field}) / count(*), 1) as f_dstct_prct, + count(*) - count(distinct {field}) as f_dup_cnt + from `{schema}`.`{table}` + + field_stat_group: | + -- field_stat_group {field} + select + '{schema}' as schema_nm, + '{table}' as table_nm, + {group_expr} as group_exp, + '{field}' as field, + count(*) as tot_cnt, + count({field}) as f_cnt, + count(*) - count({field}) as f_null_cnt, + round(100.0 * (count(*) - count({field})) / count(*), 1) as f_null_prct, + count(distinct {field}) as f_dstct_cnt, + round(100.0 * count(distinct {field}) / count(*), 1) as f_dstct_prct, + count(*) - count(distinct {field}) as f_dup_cnt + from `{schema}`.`{table}` + group by {group_expr} + + field_stat_deep: | + -- field_stat_deep {field} + select + '{schema}' as schema_nm, + '{table}' as table_nm, + '{field}' as field, + '{type}' as type, + count(*) as tot_cnt, + count({field}) as f_cnt, + count(*) - count({field}) as f_null_cnt, + round(100.0 * (count(*) - count({field})) / count(*), 1) as f_null_prct, + count(distinct {field}) as f_dstct_cnt, + round(100.0 * count(distinct {field}) / count(*), 1) as f_dstct_prct, + count(*) - count(distinct {field}) as f_dup_cnt, + cast(min({field}) as char(4000)) as f_min, + cast(max({field}) as char(4000)) as f_max, + min(length({field})) as f_min_len, + max(length({field})) as f_max_len + from `{schema}`.`{table}` + + fill_cnt_group_field: | + -- fill_cnt_group_field {field} + select + {field}, + {columns_sql} + from `{schema}`.`{table}` + group by {field} + order by {field} + + fill_rate_group_field: | + -- fill_rate_group_field {field} + select + {field}, + {fill_rate_fields_sql} + from `{schema}`.`{table}` + group by {field} + order by {field} + + distro_field: | + -- distro_field {field} + with t1 as ( + select + '{field}' as field, + {field}, + count(*) cnt + from `{schema}`.`{table}` + group by {field} + order by count(*) desc + ) + , t2 as ( + select + '{field}' as field, + count(*) ttl_cnt + from `{schema}`.`{table}` + ) + select + '{table}' as table_nm, + t1.field, + {field} as value, + cnt, + round(100.0 * cnt / ttl_cnt, 2) as prct + from t1 + join t2 + on t1.field = t2.field + order by cnt desc + + distro_field_group: | + -- distro_field_group {field} + with t1 as ( + select + '{field}' as field, + {group_expr} as group_exp, + {field}, + count(*) cnt + from `{schema}`.`{table}` + group by {field}, {group_expr} + order by count(*) desc + ) + , t2 as ( + select + '{field}' as field, + count(*) ttl_cnt + from `{schema}`.`{table}` + ) + select + '{table}' as table_nm, + t1.field, + t1.group_exp, + {field} as value, + cnt, + round(100.0 * cnt / ttl_cnt, 2) as prct + from t1 + join t2 + on t1.field = t2.field + order by cnt desc + + distro_field_date: | + -- distro_field_date {field} + with t1 as ( + select + '{field}' as field, + year({field}) as year, + month({field}) as month, + count(*) cnt + from `{schema}`.`{table}` + group by year({field}), month({field}) + order by year({field}), month({field}) + ) + , t2 as ( + select '{field}' as field, count(*) ttl_cnt + from `{schema}`.`{table}` + ) + select + '{schema}' as schema_nm, + '{table}' as table_nm, + t1.field, + t1.year, + t1.month, + cnt, + round(100.0 * cnt / ttl_cnt, 2) as prct + from t1 + join t2 + on t1.field = t2.field + order by t1.year, t1.month + + distro_field_date_wide: | + -- distro_field_date {table} + select + '{date_field}' as date_field, + year({date_field}) as year, + month({date_field}) as month, + {columns_sql} + from `{schema}`.`{table}` + {where_clause} + group by year({date_field}), month({date_field}) + order by year({date_field}), month({date_field}) + + test_pk: | + select + '`{schema}`.`{table}`' as table_nm, + '{fields_exp}' as expression, + case when count(*) = count(distinct {fields_exp}) then 'PASS' else 'FAIL' end as pk_result, + count(*) as tot_cnt, + count(distinct {fields_exp}) as expression_cnt, + count(*) - count(distinct {fields_exp}) as delta_cnt, + sum(case when {fields_exp} is null then 1 else 0 end) as null_cnt + from `{schema}`.`{table}` + {where_clause} + +routine: + number_min_max: | + select + count(*) as tot_cnt, + count({field}) as field_cnt, + min({field}) as min_val, + max({field}) as max_val + from `{schema}`.`{table}` + + number_trunc_min_max: | + select + {fields} + from `{schema}`.`{table}` + where {where} + (({partition_col_trunc} >= '{min_val}' + and {partition_col_trunc} <= '{max_val}') + {or_null}) + + date_trunc_uniques: | + select + {partition_col_trunc} as day_field, + count(*) cnt + from `{schema}`.`{table}` + {where} + group by {partition_col_trunc} + order by {partition_col_trunc} + + number_trunc_uniques: | + select + {partition_col_trunc} as trunc_field, + count(*) cnt + from `{schema}`.`{table}` + {where} + group by {partition_col_trunc} + order by {partition_col_trunc} + +function: + replace: replace({string_expr}, {to_replace}, {replacement}) + str_utf8: '{ field }' + fill_cnt_field: count({field}) as cnt_{field} + fill_rate_field: round(100.0 * count({field}) / count(*), 2) as prct_{field} + sleep: select sleep({seconds}) + checksum_decimal: 'abs(truncate({field}, 0))' + checksum_datetime: cast((UNIX_TIMESTAMP({field}) * 1000000) as UNSIGNED) + checksum_boolean: '{field}' + +variable: + bind_string: "?" + quote_char: '`' + ddl_col: 1 + batch_rows: 500 + bool_as: integer + +error_filter: + table_not_exist: exist diff --git a/dbio_types.go b/dbio_types.go index 87f2c34..34be305 100644 --- a/dbio_types.go +++ b/dbio_types.go @@ -40,6 +40,7 @@ const ( TypeDbPostgres Type = "postgres" TypeDbRedshift Type = "redshift" TypeDbMySQL Type = "mysql" + TypeDbMariaDB Type = "mariadb" TypeDbOracle Type = "oracle" TypeDbBigTable Type = "bigtable" TypeDbBigQuery Type = "bigquery" @@ -79,7 +80,7 @@ func ValidateType(tStr string) (Type, bool) { switch t { case TypeFileLocal, TypeFileS3, TypeFileAzure, TypeFileGoogle, TypeFileSftp, - TypeDbPostgres, TypeDbRedshift, TypeDbMySQL, TypeDbOracle, TypeDbBigQuery, TypeDbSnowflake, TypeDbSQLite, TypeDbSQLServer, TypeDbAzure, TypeDbAzureDWH, TypeDbDuckDb, TypeDbMotherDuck: + TypeDbPostgres, TypeDbRedshift, TypeDbMySQL, TypeDbMariaDB, TypeDbOracle, TypeDbBigQuery, TypeDbSnowflake, TypeDbSQLite, TypeDbSQLServer, TypeDbAzure, TypeDbAzureDWH, TypeDbDuckDb, TypeDbMotherDuck: return t, true } @@ -101,6 +102,7 @@ func (t Type) DefPort() int { TypeDbPostgres: 5432, TypeDbRedshift: 5439, TypeDbMySQL: 3306, + TypeDbMariaDB: 3306, TypeDbOracle: 1521, TypeDbSQLServer: 1433, TypeDbAzure: 1433, @@ -118,7 +120,7 @@ func (t Type) DBNameUpperCase() bool { // Kind returns the kind of connection func (t Type) Kind() Kind { switch t { - case TypeDbPostgres, TypeDbRedshift, TypeDbMySQL, TypeDbOracle, TypeDbBigQuery, TypeDbBigTable, + case TypeDbPostgres, TypeDbRedshift, TypeDbMySQL, TypeDbMariaDB, TypeDbOracle, TypeDbBigQuery, TypeDbBigTable, TypeDbSnowflake, TypeDbSQLite, TypeDbSQLServer, TypeDbAzure, TypeDbClickhouse, TypeDbDuckDb, TypeDbMotherDuck: return KindDatabase case TypeFileLocal, TypeFileHDFS, TypeFileS3, TypeFileAzure, TypeFileGoogle, TypeFileSftp, TypeFileHTTP, Type("https"): @@ -176,6 +178,7 @@ func (t Type) NameLong() string { TypeDbPostgres: "DB - PostgreSQL", TypeDbRedshift: "DB - Redshift", TypeDbMySQL: "DB - MySQL", + TypeDbMariaDB: "DB - MariaDB", TypeDbOracle: "DB - Oracle", TypeDbBigQuery: "DB - BigQuery", TypeDbBigTable: "DB - BigTable", @@ -213,6 +216,7 @@ func (t Type) Name() string { TypeDbPostgres: "PostgreSQL", TypeDbRedshift: "Redshift", TypeDbMySQL: "MySQL", + TypeDbMariaDB: "MariaDB", TypeDbOracle: "Oracle", TypeDbBigQuery: "BigQuery", TypeDbBigTable: "BigTable",