Skip to content

Commit

Permalink
Getting Relation columns and tables to fill JSON map (#461)
Browse files Browse the repository at this point in the history
* This commit will add processors

* This commit will update the readme file

* This commit will add new faker

* This commit will update the logic and edit the readme

* This commit will add client queries

* This commit will add mapper functions

* This commit will alter the query of the table

* This commit will edit the condition in add column
  • Loading branch information
AAVision authored Aug 14, 2024
1 parent ffcafaf commit 616fe48
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 2 deletions.
44 changes: 44 additions & 0 deletions db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,50 @@ func GetAllSchemaColumns(db *sql.DB) (*sql.Rows, error) {
return rows, nil
}

// GetTablesName will return a rows of tables name.
func GetTablesName(db *sql.DB) (*sql.Rows, error) {
query := `
SELECT table_name, table_schema
FROM information_schema.tables
WHERE table_type='BASE TABLE'
AND table_schema NOT IN ('information_schema', 'pg_catalog');
`

rows, err := db.Query(query)

if err != nil {
log.Error(err)
return nil, err
}
return rows, nil
}

// GetRelationalColumn will return a row pointer to a list of table and column names relations.
func GetRelationalColumns(db *sql.DB, tables string) (*sql.Rows, error) {
query := `
SELECT
tc.table_schema,
tc.table_name, kcu.column_name,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name, tc.constraint_name
FROM
information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
WHERE constraint_type = 'FOREIGN KEY' AND tc.table_name IN
` + tables + ";"

rows, err := db.Query(query)

if err != nil {
log.Error(err)
return nil, err
}
return rows, nil
}

// GetAllTablesInSchema will return a list of database tables for a given database configuration.
func GetAllTablesInSchema(conf PGConfig, schema string) ([]string, error) {
var (
Expand Down
83 changes: 81 additions & 2 deletions mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"strings"
"time"

log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -209,9 +210,17 @@ func findColumn(columns []ColumnMapper, columnName, tableName, schemaPrefix, sch

// addColumn creates a ColumnMapper structure based on the input parameters.
func addColumn(columnName, tableName, schema, dataType string, ordinalPosition int,
isNullable bool) ColumnMapper {
isNullable bool, relationRows []map[string]interface{}) ColumnMapper {
col := ColumnMapper{}

for _, value := range relationRows {
if value["table_name"] == tableName && value["column_name"] == columnName && value["table_schema"] == schema {
col.ParentTable = value["foreign_table_name"].(string)
col.ParentColumn = value["foreign_column_name"].(string)
col.ParentSchema = value["table_schema"].(string)
}
}

col.Processors = []ProcessorDefinition{
{
Name: "Identity",
Expand All @@ -227,6 +236,55 @@ func addColumn(columnName, tableName, schema, dataType string, ordinalPosition i
return col
}

func ProcessRowToMap(rows *sql.Rows) []map[string]interface{} {
returnedColumns, err := rows.Columns()

scanArgs := make([]interface{}, len(returnedColumns))
values := make([]interface{}, len(returnedColumns))

results := make([]map[string]interface{}, 0)

for i := range values {
scanArgs[i] = &values[i]
}

for rows.Next() {
err = rows.Scan(scanArgs...)
if err != nil {
panic(err)
}

record := make(map[string]interface{})

for i, col := range values {
if col != nil {
switch col.(type) {
case bool:
record[returnedColumns[i]] = col.(bool)
case int:
record[returnedColumns[i]] = col.(int)
case int64:
record[returnedColumns[i]] = col.(int64)
case float64:
record[returnedColumns[i]] = col.(float64)
case string:
record[returnedColumns[i]] = col.(string)
case time.Time:
record[returnedColumns[i]] = col.(time.Time)
case []byte:
record[returnedColumns[i]] = string(col.([]byte))
default:
record[returnedColumns[i]] = col
}
}
}

results = append(results, record)
}

return results
}

// mapColumns
func mapColumns(db *sql.DB, columns []ColumnMapper, schemaPrefix, schema string,
excludeTables []string) ([]ColumnMapper, error) {
Expand Down Expand Up @@ -284,6 +342,27 @@ func mapColumns(db *sql.DB, columns []ColumnMapper, schemaPrefix, schema string,
}
defer rows.Close()

tablesNameRow, err := GetTablesName(db)

var tableName string
tableCollectionString := ""

for tablesNameRow.Next() {
err = tablesNameRow.Scan(
&tableName,
)

tableCollectionString = tableCollectionString + "'" + tableName + "',"
}

tableCollectionString = "(" + tableCollectionString[:len(tableCollectionString)-1] + ")"
defer tablesNameRow.Close()

relationRows, err := GetRelationalColumns(db, tableCollectionString)

defer relationRows.Close()

processedRowToMap := ProcessRowToMap(relationRows)
log.Debug("Iterating through rows and creating skeleton map")
for {
var (
Expand Down Expand Up @@ -334,7 +413,7 @@ func mapColumns(db *sql.DB, columns []ColumnMapper, schemaPrefix, schema string,
// add to the column map
col = findColumn(columns, columnName, tableName, schemaPrefix, schema, dataType)
if col.TableSchema == "" && col.ColumnName == "" {
col = addColumn(columnName, tableName, schema, dataType, ordinalPosition, isNullable)
col = addColumn(columnName, tableName, schema, dataType, ordinalPosition, isNullable, processedRowToMap)
// Continuously append into the column map (old and new together)
columns = append(columns, col)
}
Expand Down

0 comments on commit 616fe48

Please sign in to comment.