From 78b71730665cc02643f9f3d60f3e8d15efb6fb85 Mon Sep 17 00:00:00 2001 From: Manuel Carrer Date: Fri, 11 Oct 2024 16:01:24 +0200 Subject: [PATCH] Refactor into different packages (WIP 1) --- kdvh_importer/.gitignore | 8 +- kdvh_importer/README.md | 53 ++ kdvh_importer/dump/kdvh.go | 320 ++++++++++++ kdvh_importer/dump/kdvh_dump_functions.go | 292 +++++++++++ kdvh_importer/dump/main.go | 69 +++ kdvh_importer/go.mod | 1 + kdvh_importer/go.sum | 2 + kdvh_importer/import/kdvh.go | 608 ++++++++++++++++++++++ kdvh_importer/main.go | 11 +- kdvh_importer/utils/email.go | 59 +++ kdvh_importer/utils/utils.go | 42 ++ 11 files changed, 1458 insertions(+), 7 deletions(-) create mode 100644 kdvh_importer/README.md create mode 100644 kdvh_importer/dump/kdvh.go create mode 100644 kdvh_importer/dump/kdvh_dump_functions.go create mode 100644 kdvh_importer/dump/main.go create mode 100644 kdvh_importer/import/kdvh.go create mode 100644 kdvh_importer/utils/email.go create mode 100644 kdvh_importer/utils/utils.go diff --git a/kdvh_importer/.gitignore b/kdvh_importer/.gitignore index 61f1f359..dc50a875 100644 --- a/kdvh_importer/.gitignore +++ b/kdvh_importer/.gitignore @@ -1,2 +1,6 @@ -!*.go -!*.csv +*.txt +*.sh +kdvh_importer +tables/ +test_*/ +.env diff --git a/kdvh_importer/README.md b/kdvh_importer/README.md new file mode 100644 index 00000000..ee4bc2f8 --- /dev/null +++ b/kdvh_importer/README.md @@ -0,0 +1,53 @@ +# KDVH Importer + +Go package used to dump tables from the KDVH database and then import them into LARD. + +## Usage + +1. Compile it with + + ```terminal + go build + ``` + +1. Dump tables from KDVH + + ```terminal + ./kdvh-importer dump --help + ``` + +1. Import dumps into LARD + + ```terminal + ./kdvh-importer import --help + ``` + +## Useful design notes + +Taken from this [talk](https://www.youtube.com/watch?v=wqXqJfQMrqI&t=280s): + +1. 7 Rs: Relocate, Rehost, Replatform, Refactor, Rearchitect, Rebuild, Repurchase + +1. Write migration as code + +1. Model data from the migrating systems as different types and write code that converts between them + +1. Split migration in discreet steps (isolated?) + +1. Have explicit asserts + +1. Log to file + +1. Comment, especially regarding edge cases + +1. Use progress bars + +1. Perform test migrations (against "real" service if possible, i.e. in Ostack?) + +1. Store IDs from old systems in new system + +1. Migration is not a clean process, it's okay to write custom code for edge + cases. But treat them as a separate thing, don't try to make them fit with + the rest. + +1. There might be things that we don't need to migrate. Document why! diff --git a/kdvh_importer/dump/kdvh.go b/kdvh_importer/dump/kdvh.go new file mode 100644 index 00000000..9b23d011 --- /dev/null +++ b/kdvh_importer/dump/kdvh.go @@ -0,0 +1,320 @@ +package dump + +import ( + "database/sql" + "fmt" + "log" + "log/slog" + "os" + "path/filepath" + "slices" + "strings" + + "kdvh_importer/utils" +) + +// KDVHTable contain metadata on how to treat different tables in KDVH +type KDVHTable struct { + TableName string // Name of the table with observations + FlagTableName string // Name of the table with QC flags for observations + DumpFunc TableDumpFunction // +} + +func newKDVHTable(args ...string) *KDVHTable { + if len(args) > 2 { + panic("This function only accepts two arguments") + } + + dumpFunc := dumpDataOnly + var flagTableName string + + if len(args) == 2 { + dumpFunc = dumpDataAndFlags + flagTableName = args[1] + } + + return &KDVHTable{ + TableName: args[0], + FlagTableName: flagTableName, + DumpFunc: dumpFunc, + } +} + +// List of all the tables we care about +var KDVH_TABLES = []*KDVHTable{ + newKDVHTable("T_EDATA", "T_EFLAG"), + newKDVHTable("T_METARDATA"), + newKDVHTable("T_ADATA", "T_AFLAG"), + newKDVHTable("T_MDATA", "T_MFLAG"), + newKDVHTable("T_TJ_DATA", "T_TJ_FLAG"), + newKDVHTable("T_PDATA", "T_PFLAG"), + newKDVHTable("T_NDATA", "T_NFLAG"), + newKDVHTable("T_VDATA", "T_VFLAG"), + newKDVHTable("T_UTLANDDATA", "T_UTLANDFLAG"), + newKDVHTable("T_ADATA_LEVEL", "T_AFLAG_LEVEL"), + newKDVHTable("T_DIURNAL", "T_DIURNAL_FLAG"), + newKDVHTable("T_AVINOR", "T_AVINOR_FLAG"), + newKDVHTable("T_PROJDATA", "T_PROJFLAG"), + newKDVHTable("T_CDCV_DATA", "T_CDCV_FLAG"), + newKDVHTable("T_MERMAID", "T_MERMAID_FLAG"), + newKDVHTable("T_SVVDATA", "T_SVVFLAG"), + newKDVHTable("T_MONTH", "T_MONTH_FLAG"), + newKDVHTable("T_HOMOGEN_DIURNAL"), + {TableName: "T_10MINUTE_DATA", FlagTableName: "T_10MINUTE_FLAG", DumpFunc: dumpByYear}, + {TableName: "T_MINUTE_DATA", FlagTableName: "T_MINUTE_FLAG", DumpFunc: dumpByYear}, + {TableName: "T_SECOND_DATA", FlagTableName: "T_SECOND_FLAG", DumpFunc: dumpByYear}, + {TableName: "T_HOMOGEN_MONTH", DumpFunc: dumpHomogenMonth}, +} + +type Element struct { + name string + inFlagTable bool +} +type Elements []Element + +func (s *Elements) get(v string) (*Element, bool) { + for _, element := range *s { + if v == element.name { + return &element, true + } + } + return nil, false +} + +func filterElements(slice []string, reference Elements) []Element { + if slice == nil { + return reference + } + + out := make([]Element, len(slice)) + for _, e := range slice { + elem, ok := reference.get(e) + if !ok { + slog.Warn(fmt.Sprintf("Element '%s' not present in database", e)) + continue + } + out = append(out, *elem) + } + + return nil +} + +func (table *KDVHTable) dump(conn *sql.DB, config *DumpConfig) { + defer utils.SendEmailOnPanic("kdvh table.dump", config.Email) + + // TODO: should probably do it at the station/element level? + outdir := filepath.Join(config.BaseDir, table.TableName+"_combined") + if _, err := os.ReadDir(outdir); err == nil && !config.Overwrite { + slog.Info(fmt.Sprint("Skipping data dump of ", table.TableName, " because dumped folder already exists")) + return + } + + slog.Info(fmt.Sprint("Starting dump of ", table.TableName)) + utils.SetLogFile(table.TableName, "dump") + elements, err := getElements(table, conn, config) + if err != nil { + return + } + + // TODO: should be safe to spawn goroutines/waitgroup here with connection pool? + for _, element := range elements { + stations, err := getStationsWithElement(element, table, conn, config) + if err != nil { + slog.Error(fmt.Sprintf("Could not fetch stations for table %s: %v", table.TableName, err)) + continue + } + + for _, station := range stations { + path := filepath.Join(outdir, string(station)) + if err := os.MkdirAll(path, os.ModePerm); err != nil { + slog.Error(err.Error()) + continue + } + + err := table.DumpFunc( + dumpFuncArgs{ + path: path, + element: element, + station: station, + dataTable: table.TableName, + flagTable: table.FlagTableName, + }, + conn, + ) + + // NOTE: Non-nil errors are logged inside each DumpFunc + if err == nil { + slog.Info(fmt.Sprintf("%s - %s - %s: dumped successfully", table.TableName, station, element.name)) + } + } + } + + log.SetOutput(os.Stdout) + log.Println("Finished dump of", table.TableName) +} + +func getElements(table *KDVHTable, conn *sql.DB, config *DumpConfig) ([]Element, error) { + elements, err := table.fetchElements(conn) + if err != nil { + return nil, err + } + + elements = filterElements(config.Elements, elements) + return elements, nil +} + +func getStationsWithElement(element Element, table *KDVHTable, conn *sql.DB, config *DumpConfig) ([]string, error) { + stations, err := table.fetchStationsWithElement(element, conn) + if err != nil { + return nil, err + } + + msg := fmt.Sprintf("Element '%s'", element.name) + "not available for station '%s'" + stations = utils.FilterSlice(config.Stations, stations, msg) + return stations, nil +} + +func (table *KDVHTable) fetchElements(conn *sql.DB) ([]Element, error) { + // TODO: not sure why we only dump these two for this table + // TODO: separate this to its own function? Separate edge cases + if table.TableName == "T_HOMOGEN_MONTH" { + return []Element{{"rr", false}, {"tam", false}}, nil + } + + elements, err := fetchColumnNames(table.TableName, conn) + if err != nil { + slog.Error(fmt.Sprintf("Could not fetch elements for table %s: %v", table.TableName, err)) + return nil, err + } + + // Check if element is present in flag table + // NOTE: For example, unknown element 'xxx' (which is an empty column) in table T_TJ_DATA is missing from T_TJ_FLAG + // TODO: probably should not fetch 'xxx' anyway since it's not in Stinfosys anyway, and simply log + // if the element is not in the flag table? Because this feels like another edge case + if table.FlagTableName != "" { + flagElems, err := fetchColumnNames(table.FlagTableName, conn) + if err != nil { + slog.Error(fmt.Sprintf("Could not fetch elements for table %s: %v", table.FlagTableName, err)) + return nil, err + } + + for i, e := range elements { + if slices.Contains(flagElems, e) { + elements[i].inFlagTable = true + } + } + + if len(elements) < len(flagElems) { + slog.Warn(fmt.Sprintf("%s contains more elements than %s", table.FlagTableName, table.TableName)) + } + } + + return elements, nil +} + +// List of columns that are not selected in KDVH queries +// TODO: what's the difference between obs_origtime and klobs (they have same paramid)? +// Should they be added here? Do we need to exclude other elements? +var INVALID_COLUMNS = []string{"dato", "stnr", "typeid", "season"} + +// Fetch column names for a given table +func fetchColumnNames(tableName string, conn *sql.DB) ([]Element, error) { + slog.Info(fmt.Sprintf("Fetching elements for %s...", tableName)) + + rows, err := conn.Query( + "SELECT column_name FROM information_schema.columns WHERE table_name = $1 and NOT column_name = ANY($2::text[])", + // NOTE: needs to be lowercase with PG + strings.ToLower(tableName), + INVALID_COLUMNS, + ) + if err != nil { + return nil, err + } + defer rows.Close() + + var elements []Element + for rows.Next() { + var name string + if err = rows.Scan(&name); err != nil { + return nil, err + } + elements = append(elements, Element{name, false}) + } + return elements, rows.Err() +} + +func fetchStationNumbers(table *KDVHTable, conn *sql.DB) ([]string, error) { + slog.Info(fmt.Sprint("Fetching station numbers (this can take a while)...")) + + // FIXME:? this can be extremely slow + query := fmt.Sprintf( + `SELECT DISTINCT stnr FROM %s`, + table.TableName, + ) + + if table.FlagTableName != "" { + query = fmt.Sprintf( + `(SELECT stnr FROM %s) UNION (SELECT stnr FROM %s)`, + table.TableName, + table.FlagTableName, + ) + } + + rows, err := conn.Query(query) + if err != nil { + return nil, err + } + defer rows.Close() + + stations := make([]string, 0) + for rows.Next() { + var stnr string + if err := rows.Scan(&stnr); err != nil { + return nil, err + } + stations = append(stations, stnr) + } + + return stations, rows.Err() +} + +// NOTE: inverting the loops and splitting by element does make it a bit better, +// because we avoid quering for tables that have no data or flag for that element +func (table *KDVHTable) fetchStationsWithElement(element Element, conn *sql.DB) ([]string, error) { + slog.Info(fmt.Sprintf("Fetching station numbers for %s (this can take a while)...", element.name)) + + query := fmt.Sprintf( + `SELECT DISTINCT stnr FROM %s WHERE %s IS NOT NULL`, + table.TableName, + element.name, + ) + + if table.FlagTableName != "" { + if element.inFlagTable { + query = fmt.Sprintf( + `(SELECT stnr FROM %[2]s WHERE %[1]s IS NOT NULL) UNION (SELECT stnr FROM %[3]s WHERE %[1]s IS NOT NULL)`, + element.name, + table.TableName, + table.FlagTableName, + ) + } + } + + rows, err := conn.Query(query) + if err != nil { + return nil, err + } + defer rows.Close() + + stations := make([]string, 0) + for rows.Next() { + var stnr string + if err := rows.Scan(&stnr); err != nil { + return nil, err + } + stations = append(stations, stnr) + } + + // log.Println(stations) + return stations, rows.Err() +} diff --git a/kdvh_importer/dump/kdvh_dump_functions.go b/kdvh_importer/dump/kdvh_dump_functions.go new file mode 100644 index 00000000..10b8dd46 --- /dev/null +++ b/kdvh_importer/dump/kdvh_dump_functions.go @@ -0,0 +1,292 @@ +package dump + +import ( + "database/sql" + "encoding/csv" + "errors" + "fmt" + "io" + "log/slog" + "os" + "path/filepath" + "strconv" + "time" +) + +type TableDumpFunction func(dumpFuncArgs, *sql.DB) error +type dumpFuncArgs struct { + path string + element Element + station string + dataTable string + flagTable string +} + +// Fetch min and max year from table, needed for tables that are dumped by year +func fetchYearRange(tableName, station string, conn *sql.DB) (int64, int64, error) { + var beginStr, endStr string + query := fmt.Sprintf("SELECT min(to_char(dato, 'yyyy')), max(to_char(dato, 'yyyy')) FROM %s WHERE stnr = $1", tableName) + + if err := conn.QueryRow(query, station).Scan(&beginStr, &endStr); err != nil { + slog.Error(fmt.Sprint("Could not query row: ", err)) + return 0, 0, err + } + + begin, err := strconv.ParseInt(beginStr, 10, 64) + if err != nil { + slog.Error(fmt.Sprintf("Could not parse year '%s': %s", beginStr, err)) + return 0, 0, err + } + + end, err := strconv.ParseInt(endStr, 10, 64) + if err != nil { + slog.Error(fmt.Sprintf("Could not parse year '%s': %s", endStr, err)) + return 0, 0, err + } + + return begin, end, nil +} + +func dumpByYearDataOnly(args dumpFuncArgs, conn *sql.DB) error { + begin, end, err := fetchYearRange(args.dataTable, args.station, conn) + if err != nil { + return err + } + + query := fmt.Sprintf( + `SELECT dato AS time, %[1]s AS data FROM %[2]s \ + WHERE %[1]s IS NOT NULL AND stnr = $1 AND TO_CHAR(dato, 'yyyy') = $2`, + args.element.name, + args.dataTable, + ) + + for year := begin; year < end; year++ { + rows, err := conn.Query(query, args.station, year) + if err != nil { + slog.Error(fmt.Sprint("Could not query KDVH: ", err)) + return err + } + + path := filepath.Join(args.path, string(year)) + if err := os.MkdirAll(path, os.ModePerm); err != nil { + slog.Error(err.Error()) + continue + } + + if err := dumpToFile(path, args.element.name, rows); err != nil { + slog.Error(err.Error()) + return err + } + } + + return nil +} + +func dumpByYear(args dumpFuncArgs, conn *sql.DB) error { + if !args.element.inFlagTable { + dumpByYearDataOnly(args, conn) + } + + dataBegin, dataEnd, err := fetchYearRange(args.dataTable, args.station, conn) + if err != nil { + return err + } + + flagBegin, flagEnd, err := fetchYearRange(args.flagTable, args.station, conn) + if err != nil { + return err + } + + begin := min(dataBegin, flagBegin) + end := max(dataEnd, flagEnd) + + query := fmt.Sprintf( + `SELECT + COALESCE(d.dato, f.dato) AS time, + d.%[1]s AS data, + f.%[1]s AS flag + FROM + (SELECT dato, stnr, %[1]s FROM %[2]s + WHERE %[1]s IS NOT NULL AND stnr = $1 AND TO_CHAR(dato, 'yyyy') = $2) d + FULL OUTER JOIN + (SELECT dato, stnr, %[1]s FROM %[3]s + WHERE %[1]s IS NOT NULL AND stnr = $1 AND TO_CHAR(dato, 'yyyy') = $2) f + ON d.dato = f.dato`, + args.element.name, + args.dataTable, + args.flagTable, + ) + + for year := begin; year < end; year++ { + rows, err := conn.Query(query, args.station, year) + if err != nil { + slog.Error(fmt.Sprint("Could not query KDVH: ", err)) + return err + } + + path := filepath.Join(args.path, string(year)) + if err := os.MkdirAll(path, os.ModePerm); err != nil { + slog.Error(err.Error()) + continue + } + + if err := dumpToFile(path, args.element.name, rows); err != nil { + slog.Error(err.Error()) + return err + } + } + + return nil +} + +func dumpHomogenMonth(args dumpFuncArgs, conn *sql.DB) error { + query := fmt.Sprintf( + `SELECT dato AS time, %s[1]s AS data, '' AS flag FROM T_HOMOGEN_MONTH + WHERE %s[1]s IS NOT NULL AND stnr = $1 AND season BETWEEN 1 AND 12`, + args.element.name, + ) + + rows, err := conn.Query(query, args.station) + if err != nil { + slog.Error(err.Error()) + return err + } + + if err := dumpToFile(args.path, args.element.name, rows); err != nil { + slog.Error(err.Error()) + return err + } + + return nil +} + +func dumpDataOnly(args dumpFuncArgs, conn *sql.DB) error { + query := fmt.Sprintf( + "SELECT dato AS time, %[1]s AS data, '' AS flag FROM %[2]s WHERE %[1]s IS NOT NULL AND stnr = $1", + args.element.name, + args.dataTable, + ) + + rows, err := conn.Query(query, args.station) + if err != nil { + slog.Error(err.Error()) + return err + } + + if err := dumpToFile(args.path, args.element.name, rows); err != nil { + slog.Error(err.Error()) + return err + } + + return nil +} + +func dumpDataAndFlags(args dumpFuncArgs, conn *sql.DB) error { + if !args.element.inFlagTable { + return dumpDataOnly(args, conn) + } + + query := fmt.Sprintf( + `SELECT + COALESCE(d.dato, f.dato) AS time, + d.%[1]s AS data, + f.%[1]s AS flag + FROM + (SELECT dato, %[1]s FROM %[2]s WHERE %[1]s IS NOT NULL AND stnr = $1) d + FULL OUTER JOIN + (SELECT dato, %[1]s FROM %[3]s WHERE %[1]s IS NOT NULL AND stnr = $1) f + ON d.dato = f.dato`, + args.element.name, + args.dataTable, + args.flagTable, + ) + + rows, err := conn.Query(query, args.station) + if err != nil { + slog.Error(err.Error()) + return err + } + + if err := dumpToFile(args.path, args.element.name, rows); err != nil { + slog.Error(err.Error()) + return err + } + + return nil +} + +func dumpToFile(path, element string, rows *sql.Rows) error { + filename := filepath.Join(path, element+".csv") + file, err := os.Create(filename) + if err != nil { + return err + } + + err = writeElementFile(rows, file) + if closeErr := file.Close(); closeErr != nil { + return errors.Join(err, closeErr) + } + return err +} + +// Writes queried (time | data | flag) columns to CSV +func writeElementFile(rows *sql.Rows, file io.Writer) error { + defer rows.Close() + + floatFormat := "%.2f" + timeFormat := "2006-01-02_15:04:05" + + columns, err := rows.Columns() + if err != nil { + return errors.New("Could not ingget columns: " + err.Error()) + } + + count := len(columns) + line := make([]string, count) + values := make([]interface{}, count) + pointers := make([]interface{}, count) + + writer := csv.NewWriter(file) + // writer.Comma = ';' + + for rows.Next() { + for i := range columns { + pointers[i] = &values[i] + } + + if err := rows.Scan(pointers...); err != nil { + return errors.New("Could not scan rows: " + err.Error()) + } + + // Parse scanned types + for i := range columns { + var value string + + switch v := values[i].(type) { + case []byte: + value = string(v) + case float64, float32: + value = fmt.Sprintf(floatFormat, v) + case time.Time: + value = v.Format(timeFormat) + case nil: + value = "" + default: + value = fmt.Sprintf("%v", v) + } + + line[i] = value + } + + if err := writer.Write(line); err != nil { + return errors.New("Could not write to file: " + err.Error()) + } + } + + writer.Flush() + if err := writer.Error(); err != nil { + return err + } + + return rows.Err() +} diff --git a/kdvh_importer/dump/main.go b/kdvh_importer/dump/main.go new file mode 100644 index 00000000..f78a735e --- /dev/null +++ b/kdvh_importer/dump/main.go @@ -0,0 +1,69 @@ +package dump + +import ( + "database/sql" + "log/slog" + "os" + "slices" + "strings" +) + +type DumpConfig struct { + BaseDir string `long:"dir" default:"./" description:"Base directory where the dumped data is stored"` + TablesCmd string `long:"table" default:"" description:"Optional comma separated list of table names. By default all available tables are processed"` + StationsCmd string `long:"station" default:"" description:"Optional comma separated list of stations IDs. By default all station IDs are processed"` + ElementsCmd string `long:"elemcode" default:"" description:"Optional comma separated list of element codes. By default all element codes are processed"` + Overwrite bool `long:"overwrite" description:"Overwrite any existing dumped files"` + Email []string `long:"email" description:"Optional email address used to notify if the program crashed"` + Tables []string + Stations []string + Elements []string +} + +// Populates config slices by splitting cmd strings +func (config *DumpConfig) setup() { + if config.TablesCmd != "" { + config.Tables = strings.Split(config.TablesCmd, ",") + } + if config.StationsCmd != "" { + config.Stations = strings.Split(config.StationsCmd, ",") + } + if config.ElementsCmd != "" { + config.Elements = strings.Split(config.ElementsCmd, ",") + } +} + +func (config *DumpConfig) Execute(_ []string) error { + config.setup() + + dumpKDVH(config) + dumpKvalobs(config) + + return nil +} + +func dumpKDVH(config *DumpConfig) error { + // TODO: make sure we don't need direct KDVH connection + // dvhConn := getDB(os.Getenv("DVH_STRING")) + // klima11Conn := getDB(os.Getenv("KLIMA11_STRING")) + + conn, err := sql.Open("pgx", os.Getenv("KDVH_PROXY_CONN")) + if err != nil { + slog.Error(err.Error()) + return nil + } + + for _, table := range KDVH_TABLES { + if config.Tables != nil && !slices.Contains(config.Tables, table.TableName) { + continue + } + table.dump(conn, config) + } + + return nil +} + +func dumpKvalobs(config *DumpConfig) error { + // TODO: + return nil +} diff --git a/kdvh_importer/go.mod b/kdvh_importer/go.mod index 4be74088..df1d7695 100644 --- a/kdvh_importer/go.mod +++ b/kdvh_importer/go.mod @@ -16,6 +16,7 @@ require ( github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/rickb777/plural v1.4.2 // indirect + github.com/schollz/progressbar v1.0.0 // indirect golang.org/x/crypto v0.25.0 // indirect golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.22.0 // indirect diff --git a/kdvh_importer/go.sum b/kdvh_importer/go.sum index f7816baf..6a23d802 100644 --- a/kdvh_importer/go.sum +++ b/kdvh_importer/go.sum @@ -27,6 +27,8 @@ github.com/rickb777/period v1.0.5 h1:jAzlI2knYam5VMy0X8eYgqJBl0ew57N+J1djJSBOulM github.com/rickb777/period v1.0.5/go.mod h1:AmEwpgIShi3EEw34qbafoPJxVeRbv9VVtjLyOeRwK6c= github.com/rickb777/plural v1.4.2 h1:Kl/syFGLFZ5EbuV8c9SVud8s5HI2HpCCtOMw2U1kS+A= github.com/rickb777/plural v1.4.2/go.mod h1:kdmXUpmKBJTS0FtG/TFumd//VBWsNTD7zOw7x4umxNw= +github.com/schollz/progressbar v1.0.0 h1:gbyFReLHDkZo8mxy/dLWMr+Mpb1MokGJ1FqCiqacjZM= +github.com/schollz/progressbar v1.0.0/go.mod h1:/l9I7PC3L3erOuz54ghIRKUEFcosiWfLvJv+Eq26UMs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/kdvh_importer/import/kdvh.go b/kdvh_importer/import/kdvh.go new file mode 100644 index 00000000..c172fd3e --- /dev/null +++ b/kdvh_importer/import/kdvh.go @@ -0,0 +1,608 @@ +package main + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "log" + "log/slog" + "os" + "path/filepath" + "slices" + "strconv" + "strings" + "sync" + "time" + + "github.com/gocarina/gocsv" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/rickb777/period" +) + +// Used for lookup of fromtime and totime from KDVH +type KDVHKey struct { + ParamKey + Station int64 +} + +// ParamKey is used for lookup of parameter offsets and metadata from Stinfosys +type ParamKey struct { + ElemCode string `json:"ElemCode"` + TableName string `json:"TableName"` +} + +// Query from Stinfosys elem_map_cfnames_param +type Metadata struct { + ElemCode string + TableName string + TypeID int32 + ParamID int32 + Hlevel int32 + Sensor int32 + Fromtime *time.Time + // totime *time.Time +} + +// Metadata in KDVH tables +type MetaKDVH struct { + TableName string + Station int64 + ElemCode string + FromTime *time.Time + ToTime *time.Time +} + +// TODO: define the schema we want to export here +// And probably we should use nullable types? +type ObsLARD struct { + // Unique timeseries identifier + ID int32 + // Time of observation + ObsTime time.Time + // Observation data formatted as a double precision floating point + Data float64 + // Observation data formatted as a binary large object. + // TODO: what is this? + DataBlob []byte + // Estimated observation data from KDVH (unknown method) + // NOTE: unknown method ?????????? + CorrKDVH float64 + // Latest updated corrected observation data value + // Corrected float64 + // Flag encoding quality control status + KVFlagControlInfo []byte + // Flag encoding quality control status + KVFlagUseInfo []byte + // Subset of 5 digits of KVFlagUseInfo stored in KDVH + // KDVHFlag []byte + // Comma separated value listing checks that failed during quality control + // KVCheckFailed string +} + +// Struct holding (almost) all the info needed from KDVH +type ObsKDVH struct { + ID int32 + Offset period.Period + ElemCode string + ObsTime time.Time + Data string + Flags string +} + +type TimeseriesInfo struct { + ID int32 + Offset period.Period + ElemCode string + Meta *MetaKDVH +} + +type ImportConfig struct { + BaseDir string `long:"dir" default:"./" description:"Base directory where the dumped data is stored"` + Sep string `long:"sep" default:"," description:"Separator character in the dumped files. Needs to be quoted"` + TablesCmd string `long:"table" default:"" description:"Optional comma separated list of table names. By default all available tables are processed"` + StationsCmd string `long:"station" default:"" description:"Optional comma separated list of stations IDs. By default all station IDs are processed"` + ElementsCmd string `long:"elemcode" default:"" description:"Optional comma separated list of element codes. By default all element codes are processed"` + HasHeader bool `long:"has-header" description:"Add this flag if the dumped files have a header row"` + SkipData bool `long:"skip-data" description:"Skip import of data"` + SkipFlags bool `long:"skip-flags" description:"Skiph import of flags"` + Email []string `long:"email" description:"Optional email address used to notify if the program crashed"` + Tables []string + Stations []string + Elements []string + OffsetMap map[ParamKey]period.Period // Map of offsets used to correct (?) KDVH times for specific parameters + StinfoMap map[ParamKey]Metadata // Map of metadata used to query timeseries ID in LARD + KDVHMap map[KDVHKey]*MetaKDVH // Map of from_time and to_time for each (table, station, element) triplet +} + +// Sets up config: +// - Checks validity of cmd args +// - Populates slices by parsing the strings provided via cmd +// - Caches time offsets by reading 'param_offset.csv' +// - Caches metadata from Stinfosys +// - Caches metadata from KDVH proxy +func (config *ImportConfig) setup() { + if config.SkipData && config.SkipFlags { + slog.Error("Both '--skip-data' and '--skip-flags' are set, nothing to import") + os.Exit(1) + } + + if len(config.Sep) > 1 { + slog.Warn("'--sep' only accepts single-byte characters. Defaulting to ','") + config.Sep = "," + } + + if config.TablesCmd != "" { + config.Tables = strings.Split(config.TablesCmd, ",") + } + if config.StationsCmd != "" { + config.Stations = strings.Split(config.StationsCmd, ",") + } + if config.ElementsCmd != "" { + config.Elements = strings.Split(config.ElementsCmd, ",") + } + + config.OffsetMap = cacheParamOffsets() + config.StinfoMap = cacheStinfo(config.Tables, config.Elements) + config.KDVHMap = cacheKDVH(config.Tables, config.Stations, config.Elements) +} + +// This method is automatically called by go-flags while parsing the cmd +func (config *ImportConfig) Execute(_ []string) error { + config.setup() + + // Create connection pool for LARD + pool, err := pgxpool.New(context.TODO(), os.Getenv("LARD_STRING")) + if err != nil { + slog.Error(fmt.Sprint("Could not connect to Lard:", err)) + } + defer pool.Close() + + for _, table := range KDVH_TABLE_INSTRUCTIONS { + if config.Tables != nil && !slices.Contains(config.Tables, table.TableName) { + continue + } + table.updateDefaults() + importTable(pool, table, config) + } + + return nil +} + +// TODO: need to extract scalar field +// Save metadata for later use by quering Stinfosys +func cacheStinfo(tables, elements []string) map[ParamKey]Metadata { + cache := make(map[ParamKey]Metadata) + + slog.Info("Connecting to Stinfosys to cache metadata") + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + conn, err := pgx.Connect(ctx, os.Getenv("STINFO_STRING")) + if err != nil { + slog.Error(fmt.Sprint("Could not connect to Stinfosys. Make sure to be connected to the VPN.", err)) + os.Exit(1) + } + defer conn.Close(context.TODO()) + + for _, table := range KDVH_TABLE_INSTRUCTIONS { + if tables != nil && !slices.Contains(tables, table.TableName) { + continue + } + + query := `SELECT elem_code, table_name, typeid, paramid, hlevel, sensor, fromtime + FROM elem_map_cfnames_param + WHERE table_name = $1 + AND ($2::text[] IS NULL OR elem_code = ANY($2))` + + rows, err := conn.Query(context.TODO(), query, table.TableName, elements) + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + + // TODO: eventually move to RowToStructByName (less brittle, but requires adding tags to the struct) + metas, err := pgx.CollectRows(rows, pgx.RowToStructByPos[Metadata]) + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + + for _, meta := range metas { + // log.Println(meta) + cache[ParamKey{meta.ElemCode, meta.TableName}] = meta + } + } + + return cache +} + +func cacheKDVH(tables, stations, elements []string) map[KDVHKey]*MetaKDVH { + cache := make(map[KDVHKey]*MetaKDVH) + + slog.Info("Connecting to KDVH proxy to cache metadata") + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + conn, err := pgx.Connect(ctx, os.Getenv("KDVH_PROXY_CONN")) + if err != nil { + slog.Error(fmt.Sprint("Could not connect to KDVH proxy. Make sure to be connected to the VPN.", err)) + os.Exit(1) + } + defer conn.Close(context.TODO()) + + for _, t := range KDVH_TABLE_INSTRUCTIONS { + if tables != nil && !slices.Contains(tables, t.TableName) { + continue + } + + // TODO: probably need to sanitize these inputs + query := fmt.Sprintf( + `SELECT table_name, stnr, elem_code, fdato, tdato FROM %s + WHERE ($1::bigint[] IS NULL OR stnr = ANY($1)) + AND ($2::text[] IS NULL OR elem_code = ANY($2))`, + t.ElemTableName, + ) + + rows, err := conn.Query(context.TODO(), query, stations, elements) + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + + metas, err := pgx.CollectRows(rows, pgx.RowToStructByPos[MetaKDVH]) + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + + for _, meta := range metas { + cache[KDVHKey{ParamKey{meta.ElemCode, meta.TableName}, meta.Station}] = &meta + } + } + + return cache +} + +// how to modify the obstime (in kdvh) for certain paramid +func cacheParamOffsets() map[ParamKey]period.Period { + cache := make(map[ParamKey]period.Period) + + type CSVRow struct { + TableName string `csv:"table_name"` + ElemCode string `csv:"elem_code"` + ParamID int64 `csv:"paramid"` + FromtimeOffset string `csv:"fromtime_offset"` + Timespan string `csv:"timespan"` + } + csvfile, err := os.Open("product_offsets.csv") + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + defer csvfile.Close() + + var csvrows []CSVRow + if err := gocsv.UnmarshalFile(csvfile, &csvrows); err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + + for _, row := range csvrows { + var fromtimeOffset, timespan period.Period + if row.FromtimeOffset != "" { + fromtimeOffset, err = period.Parse(row.FromtimeOffset) + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + } + if row.Timespan != "" { + timespan, err = period.Parse(row.Timespan) + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + } + migrationOffset, err := fromtimeOffset.Add(timespan) + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + + cache[ParamKey{ElemCode: row.ElemCode, TableName: row.TableName}] = migrationOffset + } + + return cache +} + +func importTable(pool *pgxpool.Pool, table *TableInstructions, config *ImportConfig) { + defer sendEmailOnPanic("importTable", config.Email) + + if table.ImportUntil == 0 { + // log.Printf("Skipping import of %s because this table is not set for import", table.TableName) + return + } + + slog.Info(fmt.Sprint("Starting import of ", table.TableName)) + setLogFile(table.TableName, "import") + + path := filepath.Join(config.BaseDir, table.TableName+"_combined") + stations, err := os.ReadDir(path) + if err != nil { + slog.Warn(fmt.Sprintf("Could not read directory %s: %s", path, err)) + return + } + + for _, station := range stations { + stnr, err := getStationNumber(station, config.Stations) + if err != nil { + slog.Warn(err.Error()) + continue + } + + stationDir := filepath.Join(path, station.Name()) + elements, err := os.ReadDir(stationDir) + if err != nil { + slog.Warn(fmt.Sprintf("Could not read directory %s: %s", stationDir, err)) + continue + } + + var wg sync.WaitGroup + for _, element := range elements { + elemCode, err := getElementCode(element, config.Elements) + if err != nil { + slog.Warn(err.Error()) + continue + } + filename := filepath.Join(stationDir, element.Name()) + + wg.Add(1) + go func() { + defer wg.Done() + + handle, err := os.Open(filename) + if err != nil { + slog.Warn(fmt.Sprintf("Could not open file '%s': %s", filename, err)) + return + } + defer handle.Close() + + timeseries, err := getTimeseries(elemCode, table.TableName, stnr, pool, config) + if err != nil { + slog.Error(fmt.Sprintf("%v - %v - %v: could not obtain timeseries, %s", table.TableName, stnr, elemCode, err)) + return + } + + data, err := parseData(handle, timeseries, table, config) + if err != nil { + slog.Error(fmt.Sprintf("Could not parse file '%s': %s", filename, err)) + return + } + + if len(data) == 0 { + slog.Info(fmt.Sprintf("%v - %v - %v: no rows to insert (all obstimes > max import time)", table.TableName, stnr, elemCode)) + return + } + + // TODO: we should be careful here, data shouldn't contain non-scalar params + // Otherwise we need to insert to non-scalar table + if !config.SkipData { + count, err := insertData(pool, data) + if err != nil { + slog.Error(fmt.Sprintf("%v - %v - %v: failed data bulk insertion, %s", table.TableName, stnr, elemCode, err)) + return + } + + logStr := fmt.Sprintf("%v - %v - %v: %v/%v data rows inserted", table.TableName, stnr, elemCode, count, len(data)) + if int(count) != len(data) { + slog.Warn(logStr) + } else { + slog.Info(logStr) + } + } + + if !config.SkipFlags { + count, err := insertFlags(pool, data) + if err != nil { + slog.Error(fmt.Sprintf("%v - %v - %v: failed flags bulk insertion, %s", table.TableName, stnr, elemCode, err)) + return + } + logStr := fmt.Sprintf("%v - %v - %v: %v/%v flags rows inserted", table.TableName, stnr, elemCode, count, len(data)) + if int(count) != len(data) { + slog.Warn(logStr) + } else { + slog.Info(logStr) + } + } + }() + } + wg.Wait() + } + + log.SetOutput(os.Stdout) + slog.Info(fmt.Sprint("Finished import of", table.TableName)) +} + +func getStationNumber(station os.DirEntry, stationList []string) (int64, error) { + if !station.IsDir() { + return 0, errors.New(fmt.Sprintf("%s is not a directory, skipping", station.Name())) + } + + if stationList != nil && !slices.Contains(stationList, station.Name()) { + return 0, errors.New(fmt.Sprintf("Station %v not in the list, skipping", station.Name())) + } + + stnr, err := strconv.ParseInt(station.Name(), 10, 64) + if err != nil { + return 0, errors.New("Error parsing station number:" + err.Error()) + } + + return stnr, nil +} + +func getElementCode(element os.DirEntry, elementList []string) (string, error) { + elemCode := strings.ToUpper(strings.TrimSuffix(element.Name(), ".csv")) + + if elementList != nil && !slices.Contains(elementList, elemCode) { + return "", errors.New(fmt.Sprintf("Element '%s' not in the list, skipping", elemCode)) + } + + if elemcodeIsInvalid(elemCode) { + return "", errors.New(fmt.Sprintf("Element '%s' not set for import, skipping", elemCode)) + } + return elemCode, nil +} + +func parseData(handle io.Reader, ts *TimeseriesInfo, table *TableInstructions, config *ImportConfig) ([]ObsLARD, error) { + scanner := bufio.NewScanner(handle) + + // Skip header if present + if config.HasHeader { + scanner.Scan() + } + + var data []ObsLARD + for scanner.Scan() { + cols := strings.Split(scanner.Text(), config.Sep) + + obsTime, err := time.Parse("2006-01-02_15:04:05", cols[0]) + if err != nil { + return nil, err + } + + // only import data between kdvh's defined fromtime and totime + // TODO: not 100% sure why we need this? + if ts.Meta != nil { + if ts.Meta.FromTime != nil && obsTime.Sub(*ts.Meta.FromTime) < 0 { + continue + } + + if ts.Meta.ToTime != nil && obsTime.Sub(*ts.Meta.ToTime) > 0 { + break + } + } + + if obsTime.Year() >= table.ImportUntil { + break + } + + temp, err := table.ConvFunc( + ObsKDVH{ + ID: ts.ID, + Offset: ts.Offset, + ElemCode: ts.ElemCode, + ObsTime: obsTime, + Data: cols[1], + Flags: cols[2], + }, + ) + if err != nil { + return nil, err + } + data = append(data, temp) + } + return data, nil +} + +// TODO: benchmark double copyfrom vs batch insert? +func insertData(pool *pgxpool.Pool, data []ObsLARD) (int64, error) { + return pool.CopyFrom( + context.TODO(), + pgx.Identifier{"public", "data"}, + []string{"timeseries", "obstime", "obsvalue"}, + pgx.CopyFromSlice(len(data), func(i int) ([]any, error) { + return []any{ + data[i].ID, + data[i].ObsTime, + data[i].CorrKDVH, + }, nil + }), + ) +} + +func insertFlags(pool *pgxpool.Pool, data []ObsLARD) (int64, error) { + return pool.CopyFrom( + context.TODO(), + pgx.Identifier{"flags", "kdvh"}, + []string{"timeseries", "obstime", "controlinfo", "useinfo"}, + pgx.CopyFromSlice(len(data), func(i int) ([]any, error) { + return []any{ + data[i].ID, + data[i].ObsTime, + data[i].KVFlagControlInfo, + data[i].KVFlagUseInfo, + }, nil + }), + ) +} + +func getTimeseries(elemCode, tableName string, stnr int64, pool *pgxpool.Pool, config *ImportConfig) (*TimeseriesInfo, error) { + var tsid int32 + key := ParamKey{elemCode, tableName} + + offset := config.OffsetMap[key] + stinfoMeta, ok := config.StinfoMap[key] + if !ok { + // TODO: should it fail here? How do we deal with data without metadata? + return nil, errors.New("Missing metadata in Stinfosys") + } + + // if there's no metadata in KDVH we insert anyway + kdvhMeta := config.KDVHMap[KDVHKey{key, stnr}] + + // Query LARD labels table with stinfosys metadata + err := pool.QueryRow( + context.TODO(), + `SELECT timeseries FROM labels.met + WHERE station_id = $1 + AND param_id = $2 + AND type_id = $3 + AND (($4::int IS NULL AND lvl IS NULL) OR (lvl = $4)) + AND (($5::int IS NULL AND sensor IS NULL) OR (sensor = $5))`, + stnr, stinfoMeta.ParamID, stinfoMeta.TypeID, stinfoMeta.Hlevel, stinfoMeta.Sensor).Scan(&tsid) + + // If timeseries exists, return its ID, offset and metadata + if err == nil { + return &TimeseriesInfo{tsid, offset, stinfoMeta.ElemCode, kdvhMeta}, nil + } + + // Otherwise insert new timeseries + transaction, err := pool.Begin(context.TODO()) + if err != nil { + return nil, err + } + + err = transaction.QueryRow( + context.TODO(), + `INSERT INTO public.timeseries (fromtime) VALUES ($1) RETURNING id`, + stinfoMeta.Fromtime, + ).Scan(&tsid) + if err != nil { + return nil, err + } + + _, err = transaction.Exec( + context.TODO(), + `INSERT INTO labels.met (timeseries, station_id, param_id, type_id, lvl, sensor) + VALUES ($1, $2, $3, $4, $5, $6)`, + tsid, stnr, stinfoMeta.ParamID, stinfoMeta.TypeID, stinfoMeta.Hlevel, stinfoMeta.Sensor) + if err != nil { + return nil, err + } + + if err := transaction.Commit(context.TODO()); err != nil { + return nil, err + } + + return &TimeseriesInfo{tsid, offset, stinfoMeta.ElemCode, kdvhMeta}, nil +} + +// TODO: add CALL_SIGN? It's not in stinfosys? +var INVALID_ELEMENTS = []string{"TYPEID", "TAM_NORMAL_9120", "RRA_NORMAL_9120", "OT", "OTN", "OTX", "DD06", "DD12", "DD18"} + +func elemcodeIsInvalid(element string) bool { + return strings.Contains(element, "KOPI") || slices.Contains(INVALID_ELEMENTS, element) +} diff --git a/kdvh_importer/main.go b/kdvh_importer/main.go index 5290e85e..b249eb76 100644 --- a/kdvh_importer/main.go +++ b/kdvh_importer/main.go @@ -6,6 +6,7 @@ import ( "github.com/jessevdk/go-flags" "github.com/joho/godotenv" + "kdvh_importer/dump" ) // TableInstructions contain metadata on how to treat different tables in KDVH @@ -33,7 +34,7 @@ func (table *TableInstructions) updateDefaults() { } // List of all the tables we care about -var TABLE2INSTRUCTIONS = []*TableInstructions{ +var KDVH_TABLE_INSTRUCTIONS = []*TableInstructions{ // Section 1: unique tables imported in their entirety {TableName: "T_EDATA", FlagTableName: "T_EFLAG", ElemTableName: "T_ELEM_EDATA", ConvFunc: makeDataPageEdata, ImportUntil: 3000}, // all tables below are dumped @@ -83,16 +84,16 @@ type CmdArgs struct { // Validate bool `long:"validate" description:"perform data validation – if given, imported data will be validated against KDVH"` // ValidateAll bool `long:"validateall" description:"validate all timeseries – if defined, this will run validation for all data tables that have a combined folder"` // ValidateWholeTable bool `long:"validatetable" description:"validate all timeseries – if defined together with validate, this will compare ODA with all KDVH timeseries, not just those found in datadir"` - List ListConfig `command:"tables" description:"List available tables"` - Dump DumpConfig `command:"dump" description:"Dump tables from KDVH to CSV"` - Import ImportConfig `command:"import" description:"Import dumped CSV files"` + List ListConfig `command:"tables" description:"List available tables"` + Dump dump.DumpConfig `command:"dump" description:"Dump tables from KDVH to CSV"` + Import ImportConfig `command:"import" description:"Import dumped CSV files"` } type ListConfig struct{} func (config *ListConfig) Execute(_ []string) error { fmt.Println("Available tables:") - for _, table := range TABLE2INSTRUCTIONS { + for _, table := range KDVH_TABLE_INSTRUCTIONS { fmt.Println(" -", table.TableName) } return nil diff --git a/kdvh_importer/utils/email.go b/kdvh_importer/utils/email.go new file mode 100644 index 00000000..2dd55fb6 --- /dev/null +++ b/kdvh_importer/utils/email.go @@ -0,0 +1,59 @@ +package utils + +import ( + "encoding/base64" + "fmt" + "log/slog" + "net/smtp" + "os" + "runtime/debug" + "strings" +) + +func sendEmail(subject, body string, to []string) { + // server and from/to + host := "aspmx.l.google.com" + port := "25" + from := "oda-noreply@met.no" + + // add stuff to headers and make the message body + header := make(map[string]string) + header["From"] = from + header["To"] = strings.Join(to, ",") + header["Subject"] = subject + header["MIME-Version"] = "1.0" + header["Content-Type"] = "text/plain; charset=\"utf-8\"" + header["Content-Transfer-Encoding"] = "base64" + message := "" + for k, v := range header { + message += fmt.Sprintf("%s: %s\r\n", k, v) + } + + body = body + "\n\n" + fmt.Sprintf("Ran with the following command:\n%s", strings.Join(os.Args, " ")) + message += "\r\n" + base64.StdEncoding.EncodeToString([]byte(body)) + + // send the email + err := smtp.SendMail(host+":"+port, nil, from, to, []byte(message)) + if err != nil { + slog.Error(err.Error()) + return + } + slog.Info("Email sent successfully!") +} + +// send an email and resume the panic +func SendEmailOnPanic(function string, recipients []string) { + if r := recover(); r != nil { + if recipients != nil { + body := "KDVH importer was unable to finish successfully, and the error was not handled." + + " This email is sent from a recover function triggered in " + + function + + ".\n\nError message:" + + fmt.Sprint(r) + + "\n\nStack trace:\n\n" + + string(debug.Stack()) + sendEmail("LARD – KDVH importer panicked", body, recipients) + } + panic(r) + } +} diff --git a/kdvh_importer/utils/utils.go b/kdvh_importer/utils/utils.go new file mode 100644 index 00000000..7cd25cbb --- /dev/null +++ b/kdvh_importer/utils/utils.go @@ -0,0 +1,42 @@ +package utils + +import ( + "fmt" + "log" + "log/slog" + "os" + "slices" +) + +// Filters elements of a slice by comparing them to the elements of a reference slice. +// formatMsg is an optional format string with a single format argument that can be used +// to add context on why the element may be missing from the reference slice +func FilterSlice[T comparable](slice, reference []T, formatMsg string) []T { + if slice == nil { + return reference + } + + if formatMsg == "" { + formatMsg = "User input '%s' not present in reference, skipping" + } + + out := make([]T, len(slice)) + for _, s := range slice { + if !slices.Contains(reference, s) { + slog.Warn(fmt.Sprintf(formatMsg, s)) + continue + } + out = append(out, s) + } + return out +} + +func SetLogFile(tableName, procedure string) { + filename := fmt.Sprintf("%s_%s_log.txt", tableName, procedure) + fh, err := os.Create(filename) + if err != nil { + slog.Error(fmt.Sprintf("Could not create log '%s': %s", filename, err)) + return + } + log.SetOutput(fh) +}