From 046151bd048c22f202e94469d7dd742e24852807 Mon Sep 17 00:00:00 2001 From: Manuel Carrer Date: Fri, 13 Dec 2024 12:10:23 +0100 Subject: [PATCH] Allow specifying time range for import --- migrations/kvalobs/db/csv_parsers.go | 37 ++++++++++++++++++++--- migrations/kvalobs/db/import_functions.go | 13 ++++---- migrations/kvalobs/db/table.go | 2 +- migrations/kvalobs/import/import.go | 11 ++++--- 4 files changed, 48 insertions(+), 15 deletions(-) diff --git a/migrations/kvalobs/db/csv_parsers.go b/migrations/kvalobs/db/csv_parsers.go index 1c897a80..ada02d32 100644 --- a/migrations/kvalobs/db/csv_parsers.go +++ b/migrations/kvalobs/db/csv_parsers.go @@ -3,13 +3,14 @@ package db import ( "bufio" "migrate/lard" + "migrate/utils" "slices" "strconv" "strings" "time" ) -func parseDataCSV(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, [][]any, error) { +func parseDataCSV(tsid int32, rowCount int, timespan *utils.TimeSpan, scanner *bufio.Scanner) ([][]any, [][]any, error) { data := make([][]any, 0, rowCount) flags := make([][]any, 0, rowCount) var originalPtr, correctedPtr *float32 @@ -23,6 +24,13 @@ func parseDataCSV(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, [] return nil, nil, err } + if timespan.From != nil && obstime.Sub(*timespan.From) < 0 { + continue + } + if timespan.To != nil && obstime.Sub(*timespan.To) > 0 { + break + } + obsvalue64, err := strconv.ParseFloat(fields[1], 32) if err != nil { return nil, nil, err @@ -74,7 +82,7 @@ func parseDataCSV(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, [] } // Text obs are not flagged -func parseTextCSV(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, error) { +func parseTextCSV(tsid int32, rowCount int, timespan *utils.TimeSpan, scanner *bufio.Scanner) ([][]any, error) { data := make([][]any, 0, rowCount) for scanner.Scan() { // obstime, original, tbtime @@ -85,6 +93,13 @@ func parseTextCSV(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, er return nil, err } + if timespan.From != nil && obstime.Sub(*timespan.From) < 0 { + continue + } + if timespan.To != nil && obstime.Sub(*timespan.To) > 0 { + break + } + lardObs := lard.TextObs{ Id: tsid, Obstime: obstime, @@ -101,7 +116,7 @@ func parseTextCSV(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, er // but should instead be treated as scalars // TODO: I'm not sure these params should be scalars given that the other cloud types are not. // Should all cloud types be integers or text? -func parseMetarCloudType(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, error) { +func parseMetarCloudType(tsid int32, rowCount int, timespan *utils.TimeSpan, scanner *bufio.Scanner) ([][]any, error) { data := make([][]any, 0, rowCount) for scanner.Scan() { // obstime, original, tbtime @@ -112,6 +127,13 @@ func parseMetarCloudType(tsid int32, rowCount int, scanner *bufio.Scanner) ([][] return nil, err } + if timespan.From != nil && obstime.Sub(*timespan.From) < 0 { + continue + } + if timespan.To != nil && obstime.Sub(*timespan.To) > 0 { + break + } + val, err := strconv.ParseFloat(fields[1], 32) if err != nil { return nil, err @@ -134,7 +156,7 @@ func parseMetarCloudType(tsid int32, rowCount int, scanner *bufio.Scanner) ([][] // Function for paramids 305, 306, 307, 308 that were stored as scalar data // but should be treated as text -func parseSpecialCloudType(tsid int32, rowCount int, scanner *bufio.Scanner) ([][]any, error) { +func parseSpecialCloudType(tsid int32, rowCount int, timespan *utils.TimeSpan, scanner *bufio.Scanner) ([][]any, error) { data := make([][]any, 0, rowCount) for scanner.Scan() { // obstime, original, tbtime, corrected, controlinfo, useinfo, cfailed @@ -146,6 +168,13 @@ func parseSpecialCloudType(tsid int32, rowCount int, scanner *bufio.Scanner) ([] return nil, err } + if timespan.From != nil && obstime.Sub(*timespan.From) < 0 { + continue + } + if timespan.To != nil && obstime.Sub(*timespan.To) > 0 { + break + } + lardObs := lard.TextObs{ Id: tsid, Obstime: obstime, diff --git a/migrations/kvalobs/db/import_functions.go b/migrations/kvalobs/db/import_functions.go index 0ae183de..5b5a8327 100644 --- a/migrations/kvalobs/db/import_functions.go +++ b/migrations/kvalobs/db/import_functions.go @@ -4,6 +4,7 @@ import ( "bufio" "log/slog" "migrate/lard" + "migrate/utils" "os" "strconv" @@ -18,7 +19,7 @@ import ( // - only for histkvalobs // - 2751, 2752, 2753, 2754 are in `text_data` but should be treated as `data`? -func importData(tsid int32, label *Label, filename, logStr string, pool *pgxpool.Pool) (int64, error) { +func importData(tsid int32, label *Label, filename, logStr string, timespan *utils.TimeSpan, pool *pgxpool.Pool) (int64, error) { file, err := os.Open(filename) if err != nil { slog.Error(logStr + err.Error()) @@ -36,7 +37,7 @@ func importData(tsid int32, label *Label, filename, logStr string, pool *pgxpool scanner.Scan() if label.IsSpecialCloudType() { - text, err := parseSpecialCloudType(tsid, rowCount, scanner) + text, err := parseSpecialCloudType(tsid, rowCount, timespan, scanner) if err != nil { slog.Error(logStr + err.Error()) return 0, err @@ -51,7 +52,7 @@ func importData(tsid int32, label *Label, filename, logStr string, pool *pgxpool return count, nil } - data, flags, err := parseDataCSV(tsid, rowCount, scanner) + data, flags, err := parseDataCSV(tsid, rowCount, timespan, scanner) count, err := lard.InsertData(data, pool, logStr) if err != nil { slog.Error(logStr + err.Error()) @@ -66,7 +67,7 @@ func importData(tsid int32, label *Label, filename, logStr string, pool *pgxpool return count, nil } -func importText(tsid int32, label *Label, filename, logStr string, pool *pgxpool.Pool) (int64, error) { +func importText(tsid int32, label *Label, filename, logStr string, timespan *utils.TimeSpan, pool *pgxpool.Pool) (int64, error) { file, err := os.Open(filename) if err != nil { slog.Error(logStr + err.Error()) @@ -84,7 +85,7 @@ func importText(tsid int32, label *Label, filename, logStr string, pool *pgxpool scanner.Scan() if label.IsMetarCloudType() { - data, err := parseMetarCloudType(tsid, rowCount, scanner) + data, err := parseMetarCloudType(tsid, rowCount, timespan, scanner) if err != nil { slog.Error(logStr + err.Error()) return 0, err @@ -98,7 +99,7 @@ func importText(tsid int32, label *Label, filename, logStr string, pool *pgxpool return count, nil } - text, err := parseTextCSV(tsid, rowCount, scanner) + text, err := parseTextCSV(tsid, rowCount, timespan, scanner) if err != nil { slog.Error(logStr + err.Error()) return 0, err diff --git a/migrations/kvalobs/db/table.go b/migrations/kvalobs/db/table.go index 3c76d3dd..942552bb 100644 --- a/migrations/kvalobs/db/table.go +++ b/migrations/kvalobs/db/table.go @@ -22,4 +22,4 @@ type LabelDumpFunc func(timespan *utils.TimeSpan, pool *pgxpool.Pool, maxConn in type ObsDumpFunc func(label *Label, timespan *utils.TimeSpan, path string, pool *pgxpool.Pool) error // Lard Import function -type ImportFunc func(tsid int32, label *Label, filename, logStr string, pool *pgxpool.Pool) (int64, error) +type ImportFunc func(tsid int32, label *Label, filename, logStr string, timespan *utils.TimeSpan, pool *pgxpool.Pool) (int64, error) diff --git a/migrations/kvalobs/import/import.go b/migrations/kvalobs/import/import.go index 853950ce..785b2a75 100644 --- a/migrations/kvalobs/import/import.go +++ b/migrations/kvalobs/import/import.go @@ -27,7 +27,8 @@ func ImportTable(table *kvalobs.Table, cache *cache.Cache, pool *pgxpool.Pool, c return 0, err } - fmt.Printf("Number of stations to dump: %d...\n", len(stations)) + importTimespan := config.TimeSpan() + fmt.Printf("Number of stations to import: %d...\n", len(stations)) var rowsInserted int64 for _, station := range stations { stnr, err := strconv.ParseInt(station.Name(), 10, 32) @@ -72,21 +73,23 @@ func ImportTable(table *kvalobs.Table, cache *cache.Cache, pool *pgxpool.Pool, c return } - timespan, err := cache.GetSeriesTimespan(label) + tsTimespan, err := cache.GetSeriesTimespan(label) if err != nil { slog.Error(logStr + err.Error()) return } // TODO: figure out where to get fromtime, kvalobs directly? Stinfosys? - tsid, err := lard.GetTimeseriesID(label.ToLard(), timespan, pool) + tsid, err := lard.GetTimeseriesID(label.ToLard(), tsTimespan, pool) if err != nil { slog.Error(logStr + err.Error()) return } filename := filepath.Join(stationDir, file.Name()) - count, err := table.Import(tsid, label, filename, logStr, pool) + // TODO: it's probably better to dump in different directories + // instead of introducing runtime checks + count, err := table.Import(tsid, label, filename, logStr, importTimespan, pool) if err != nil { // Logged inside table.Import return