Skip to content

Commit

Permalink
Use ELEM table to query unique stations in DATA table
Browse files Browse the repository at this point in the history
  • Loading branch information
Lun4m committed Nov 14, 2024
1 parent 2b5ebf9 commit 954774b
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 90 deletions.
113 changes: 37 additions & 76 deletions migrations/kdvh/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"migrate/utils"
)

// List of columns that we do not need to select when extracting the element codes from a KDVH table
var INVALID_COLUMNS = []string{"dato", "stnr", "typeid", "season", "xxx"}

type DumpConfig struct {
BaseDir string `short:"p" long:"path" default:"./dumps/kdvh" description:"Location the dumped data will be stored in"`
Tables []string `short:"t" delimiter:"," long:"table" default:"" description:"Optional comma separated list of table names. By default all available tables are processed"`
Expand Down Expand Up @@ -56,52 +59,41 @@ func (table *Table) Dump(conn *sql.DB, config *DumpConfig) {
return
}

bar := utils.NewBar(len(elements), table.TableName)

// TODO: should be safe to spawn goroutines/waitgroup here with connection pool?
bar.RenderBlank()
for _, element := range elements {
table.dumpElement(element, conn, config)
bar.Add(1)
}
}

// TODO: maybe we don't do this? Or can we use pgdump/copy?
// The problem is that there are no indices on the tables, that's why the queries are super slow
// Dumping the whole table might be a lot faster (for T_MDATA it's ~10 times faster!),
// but it might be more difficult to recover if something goes wrong?
// =>
// copyQuery := fmt.SPrintf("\\copy (select * from t_mdata) TO '%s/%s.csv' WITH CSV HEADER", config.BaseDir, table.TableName)
// cmd := exec.Command("psql", CONN_STRING, "-c", copyQuery)
// cmd.Stderr = &bytes.Buffer{}
// err = cmd.Run()
func (table *Table) dumpElement(element string, conn *sql.DB, config *DumpConfig) {
stations, err := table.getStationsWithElement(element, conn, config)
stations, err := table.getStations(conn, config)
if err != nil {
slog.Error(fmt.Sprintf("Could not fetch stations for table %s: %v", table.TableName, err))
return
}

bar := utils.NewBar(len(stations), table.TableName)

bar.RenderBlank()
for _, station := range stations {
path := filepath.Join(table.Path, string(station))
if err := os.MkdirAll(path, os.ModePerm); err != nil {
slog.Error(err.Error())
return
}

meta := DumpMeta{
element: element,
station: station,
dataTable: table.TableName,
flagTable: table.FlagTableName,
overwrite: config.Overwrite,
logStr: fmt.Sprintf("%s - %s - %s: ", table.TableName, station, element),
}
for _, element := range elements {
err := table.dumpFunc(
path,
DumpMeta{
element: element,
station: station,
dataTable: table.TableName,
flagTable: table.FlagTableName,
overwrite: config.Overwrite,
},
conn,
)

if err := table.dumpFunc(path, meta, conn); err == nil {
// NOTE: Non-nil errors are logged inside each DumpFunc
slog.Info(meta.logStr + "dumped successfully")
if err == nil {
slog.Info(fmt.Sprintf("%s - %s - %s: dumped successfully", table.TableName, station, element))
}
}

bar.Add(1)
}
}

Expand All @@ -121,9 +113,6 @@ func (table *Table) getElements(conn *sql.DB, config *DumpConfig) ([]string, err
return elements, nil
}

// List of columns that we do not need to select when extracting the element codes from a KDVH table
var INVALID_COLUMNS = []string{"dato", "stnr", "typeid", "season", "xxx"}

// Fetch column names for a given table
// We skip the columns defined in INVALID_COLUMNS and all columns that contain the 'kopi' string
// TODO: should we dump these invalid/kopi elements even if we are not importing them?
Expand Down Expand Up @@ -163,57 +152,29 @@ func (table *Table) fetchElements(conn *sql.DB) (elements []string, err error) {
}

// Fetches station numbers and filters them based on user input
func (table *Table) getStationsWithElement(element string, conn *sql.DB, config *DumpConfig) ([]string, error) {
stations, err := table.fetchStationsWithElement(element, conn)
func (table *Table) getStations(conn *sql.DB, config *DumpConfig) ([]string, error) {
stations, err := table.fetchStnrFromElemTable(conn)
if err != nil {
return nil, err
}

msg := fmt.Sprintf("Element '%s'", element) + "not available for station '%s'"
stations = utils.FilterSlice(config.Stations, stations, msg)
stations = utils.FilterSlice(config.Stations, stations, "")
return stations, nil
}

// Fetches the unique station numbers in the table for a given element (and when that element is not null)
// NOTE: splitting by element does make it a bit better, because we avoid quering for stations that have no data or flag for that element?
func (table *Table) fetchStationsWithElement(element string, conn *sql.DB) (stations []string, err error) {
slog.Info(fmt.Sprintf("Fetching station numbers for %s (this can take a while)...", element))

query := fmt.Sprintf(
`SELECT DISTINCT stnr FROM %s WHERE %s IS NOT NULL`,
table.TableName,
element,
)
// This function uses the ELEM table to fetch the station numbers
func (table *Table) fetchStnrFromElemTable(conn *sql.DB) (stations []string, err error) {
slog.Info(fmt.Sprint("Fetching station numbers..."))

rows, err := conn.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()

for rows.Next() {
var stnr string
if err := rows.Scan(&stnr); err != nil {
return nil, err
}
stations = append(stations, stnr)
var rows *sql.Rows
if table.ElemTableName == "T_ELEM_OBS" {
query := `SELECT DISTINCT stnr FROM t_elem_obs WHERE table_name = $1`
rows, err = conn.Query(query, table.TableName)
} else {
query := fmt.Sprintf("SELECT DISTINCT stnr FROM %s", strings.ToLower(table.ElemTableName))
rows, err = conn.Query(query)
}

return stations, rows.Err()
}

// Fetches all unique station numbers in the table
// FIXME: the DISTINCT query can be extremely slow
// NOTE: decided to use fetchStationsWithElement instead
func (table *Table) fetchStationNumbers(conn *sql.DB) (stations []string, err error) {
slog.Info(fmt.Sprint("Fetching station numbers (this can take a while)..."))

query := fmt.Sprintf(
`SELECT DISTINCT stnr FROM %s`,
table.TableName,
)

rows, err := conn.Query(query)
if err != nil {
return nil, err
}
Expand Down
31 changes: 18 additions & 13 deletions migrations/kdvh/dump_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,19 @@ import (
"time"
)

// Format string for date field in CSV files
const TIMEFORMAT string = "2006-01-02_15:04:05"

// Error returned if no observations are found for a (station, element) pair
var EMPTY_QUERY_ERR error = errors.New("The query did not return any rows")

// Struct representing a single record in the output CSV file
type Record struct {
time time.Time
data sql.NullString
flag sql.NullString
}

func fileExists(filename string, overwrite bool) error {
if _, err := os.Stat(filename); err == nil && !overwrite {
return errors.New(
Expand All @@ -28,7 +38,7 @@ func fileExists(filename string, overwrite bool) error {
return nil
}

// Fetch min and max year from table, needed for tables that are dumped by year
// Helper function for dumpByYear functinos Fetch min and max year from table, needed for tables that are dumped by year
func fetchYearRange(tableName, station string, conn *sql.DB) (int64, int64, error) {
var beginStr, endStr string
query := fmt.Sprintf("SELECT min(to_char(dato, 'yyyy')), max(to_char(dato, 'yyyy')) FROM %s WHERE stnr = $1", tableName)
Expand Down Expand Up @@ -253,12 +263,17 @@ func dumpDataAndFlags(path string, meta DumpMeta, conn *sql.DB) error {
}

func dumpToFile(filename string, rows *sql.Rows) error {
file, err := os.Create(filename)
lines, err := sortRows(rows)
if err != nil {
return err
}

lines, err := sortRows(rows)
// Return if query was empty
if len(lines) == 0 {
return EMPTY_QUERY_ERR
}

file, err := os.Create(filename)
if err != nil {
return err
}
Expand All @@ -270,13 +285,6 @@ func dumpToFile(filename string, rows *sql.Rows) error {
return err
}

// Struct representing a single record in the output CSV file
type Record struct {
time time.Time
data sql.NullString
flag sql.NullString
}

// Scans the rows and collects them in a slice of chronologically sorted lines
func sortRows(rows *sql.Rows) ([]Record, error) {
defer rows.Close()
Expand All @@ -299,9 +307,6 @@ func sortRows(rows *sql.Rows) ([]Record, error) {
return records, rows.Err()
}

// Format string for date field in CSV files
const TIMEFORMAT string = "2006-01-02_15:04:05"

// Writes queried (time | data | flag) columns to CSV
func writeElementFile(lines []Record, file io.Writer) error {
// Write number of lines as header
Expand Down
2 changes: 1 addition & 1 deletion migrations/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func FilterSlice[T comparable](slice, reference []T, formatMsg string) []T {
}

if formatMsg == "" {
formatMsg = "User input '%s' not present in reference, skipping"
formatMsg = "Value '%s' not present in reference slice, skipping"
}

// I hate this so much
Expand Down

0 comments on commit 954774b

Please sign in to comment.