Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into logging-update
Browse files Browse the repository at this point in the history
  • Loading branch information
kellrott committed May 15, 2024
2 parents d93c3ad + 993ea1c commit 68125a4
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 75 deletions.
14 changes: 7 additions & 7 deletions extractors/tabular_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"encoding/csv"
"fmt"
"io"
"log"
"os"
"strings"

"compress/gzip"

"github.com/bmeg/sifter/config"
"github.com/bmeg/sifter/evaluate"
"github.com/bmeg/sifter/logger"
"github.com/bmeg/sifter/task"
)

Expand Down Expand Up @@ -49,7 +49,7 @@ func buildUniqueArray(src []string) []string {
}

func (ml *TableLoadStep) Start(task task.RuntimeTask) (chan map[string]interface{}, error) {
log.Printf("Starting Table Load")
logger.Info("Starting Table Load")
input, err := evaluate.ExpressionString(ml.Input, task.GetConfig(), nil)
if err != nil {
return nil, err
Expand All @@ -62,7 +62,7 @@ func (ml *TableLoadStep) Start(task task.RuntimeTask) (chan map[string]interface
} else if s.IsDir() {
return nil, fmt.Errorf("input not a file: %s", inputPath)
}
log.Printf("Loading table: %s", inputPath)
logger.Info("Loading table", "path", inputPath)

var inputStream io.ReadCloser
if gfile, err := os.Open(inputPath); err == nil {
Expand Down Expand Up @@ -121,9 +121,9 @@ func (ml *TableLoadStep) Start(task task.RuntimeTask) (chan map[string]interface
err = nil
}
} else if pe.Err == csv.ErrQuote {
log.Printf("quote error: %s", record)
logger.Error("quote error", "record", record)
} else if pe.Err == csv.ErrBareQuote {
log.Printf("bare quote error: %s", record)
logger.Error("bare quote error", "record", record)
}
}
}
Expand All @@ -150,10 +150,10 @@ func (ml *TableLoadStep) Start(task task.RuntimeTask) (chan map[string]interface
}
}
} else {
log.Printf("Error: %s", err)
logger.Error("Error", "error", err)
}
}
log.Printf("Done Loading")
logger.Info("Done Loading")
close(procChan)
}()

Expand Down
41 changes: 18 additions & 23 deletions extractors/transpose_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/binary"
"encoding/csv"
"io"
"log"
"os"
"path/filepath"
"strings"
Expand All @@ -15,6 +14,7 @@ import (

"github.com/bmeg/sifter/config"
"github.com/bmeg/sifter/evaluate"
"github.com/bmeg/sifter/logger"
"github.com/bmeg/sifter/task"
"github.com/cockroachdb/pebble"
)
Expand Down Expand Up @@ -101,12 +101,12 @@ func transposeInMem(c csvReader, out chan map[string]any) error {
break
}
if err != nil {
log.Printf("Error %s", err)
logger.Error("Error", "error", err)
break
}
matrix = append(matrix, record)
}
log.Printf("Writing Transpose")
logger.Info("Writing Transpose")
l := len(matrix[0])
h := len(matrix)
columns := make([]string, h)
Expand Down Expand Up @@ -142,7 +142,6 @@ func copyBytes(in []byte) []byte {
}

func (pbw *pebbleBulkWrite) Set(id []byte, val []byte) error {
//log.Printf("Setting %x %s", id, val)
pbw.curSize += len(id) + len(val)
if pbw.highest == nil || bytes.Compare(id, pbw.highest) > 0 {
pbw.highest = copyBytes(id)
Expand All @@ -152,7 +151,7 @@ func (pbw *pebbleBulkWrite) Set(id []byte, val []byte) error {
}
err := pbw.batch.Set(id, val, nil)
if pbw.curSize > maxWriterBuffer {
log.Printf("Running batch Commit")
logger.Info("Running batch Commit")
pbw.batch.Commit(nil)
pbw.batch.Reset()
pbw.curSize = 0
Expand All @@ -161,7 +160,7 @@ func (pbw *pebbleBulkWrite) Set(id []byte, val []byte) error {
}

func (pbw *pebbleBulkWrite) Close() error {
log.Printf("Running batch close Commit")
logger.Info("Running batch close Commit")
err := pbw.batch.Commit(nil)
if err != nil {
return err
Expand Down Expand Up @@ -191,7 +190,7 @@ func transposeInDB(workdir string, c csvReader, out chan map[string]any) error {
colCount := uint64(0)
for row := uint64(0); ; row++ {
if (row % 100) == 0 {
log.Printf("Row: %d", row)
logger.Info("Row", "row", row)
}
record, err := r.Read()
if err == io.EOF {
Expand All @@ -207,22 +206,21 @@ func transposeInDB(workdir string, c csvReader, out chan map[string]any) error {
bCol := make([]byte, 8)
binary.BigEndian.PutUint64(bCol, col)
//key := bytes.Join([][]byte{bRow, bCol}, []byte{})
//log.Printf("Put %x", key)
key := bytes.Join([][]byte{bCol, bRow}, []byte{})
err := pbw.Set(key, []byte(record[col]))
if err != nil {
log.Printf("Put Error: %s", err)
logger.Error("Put Error", "message", err)
}
}
rowCount = row + 1
}
if err := pbw.Close(); err != nil {
log.Print(err)
logger.Error("close error", "message", err)
}

log.Println(db.Metrics().String())
logger.Info(db.Metrics().String())

log.Printf("Col/Row counts: %d %d", colCount, rowCount)
logger.Info("Col/Row counts", "col", colCount, "row", rowCount)

columns := []string{}

Expand All @@ -238,15 +236,13 @@ func transposeInDB(workdir string, c csvReader, out chan map[string]any) error {
columns = append(columns, string(val))
c.Close()
} else {
log.Printf("Column error: %s", err)
logger.Error("Column error", "message", err)
}
}

//log.Printf("Columns: %#v", columns)

for col := uint64(1); col < colCount; col++ {
if (col % 100) == 0 {
log.Printf("Writing Col %d", col)
logger.Info("Writing Col", "id", col)
}
prefix := make([]byte, 8)
binary.BigEndian.PutUint64(prefix, col)
Expand All @@ -259,7 +255,6 @@ func transposeInDB(workdir string, c csvReader, out chan map[string]any) error {
o = append(o, string(r))
}
it.Close()
//log.Printf("Col width: %d %d", len(columns), len(o))
if len(o) == len(columns) {
res := make(map[string]any, len(columns))
for i := 0; i < len(o); i++ {
Expand Down Expand Up @@ -292,7 +287,7 @@ func transposeInTable(workdir string, fieldSize int, c csvReader, out chan map[s
writer := bufio.NewWriterSize(table, 1024*10)
for row := int64(0); ; row++ {
if (row % 100) == 0 {
log.Printf("Row: %d", row)
logger.Info("Row", "i", row)
}
record, err := r.Read()
if err == io.EOF {
Expand All @@ -302,7 +297,7 @@ func transposeInTable(workdir string, fieldSize int, c csvReader, out chan map[s
colCount = int64(len(record))
}
if colCount != int64(len(record)) {
log.Printf("Incorrectly sized row: %d != %d", colCount, uint64(len(record)))
logger.Error("Incorrectly sized row", "colCount", colCount, "recordSize", uint64(len(record)))
}
for col := int64(0); col < colCount; col++ {
b := []byte(record[col])
Expand All @@ -316,7 +311,7 @@ func transposeInTable(workdir string, fieldSize int, c csvReader, out chan map[s

stepSize := colCount * int64(fieldSize)

log.Printf("Col/Row counts: %d %d", colCount, rowCount)
logger.Info("Col/Row counts", "col", colCount, "row", rowCount)

columns := []string{}
for row := int64(0); row < rowCount; row++ {
Expand All @@ -326,15 +321,15 @@ func transposeInTable(workdir string, fieldSize int, c csvReader, out chan map[s
if err == nil {
columns = append(columns, string(tmp[0]))
} else {
log.Printf("Column error: %s", err)
logger.Error("Column error", "message", err)
}
}

log.Printf("Columns: %s\n", columns)
logger.Info("Columns", "columns", columns)

for col := int64(1); col < colCount; col++ {
if (col % 100) == 0 {
log.Printf("Writing Col %d", col)
logger.Info("Writing Col", "i", col)
}
record := map[string]any{}
for row := int64(0); row < rowCount; row++ {
Expand Down
36 changes: 18 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
module github.com/bmeg/sifter

replace github.com/santhosh-tekuri/jsonschema/v5 v5.3.0 => github.com/bmeg/jsonschema/v5 v5.3.3

go 1.21

toolchain go1.21.3

require (
github.com/akrylysov/pogreb v0.9.1
github.com/alecthomas/jsonschema v0.0.0-20200514014646-0366d1034a17
Expand All @@ -9,7 +15,7 @@ require (
github.com/bmeg/golib v0.0.0-20200725232156-e799a31439fc
github.com/bmeg/grip v0.0.0-20210910231938-94d69d94ff65
github.com/bmeg/jsonpath v0.0.0-20210207014051-cca5355553ad
github.com/bmeg/jsonschemagraph v0.0.0-20240302044834-7ed232fc40f0
github.com/bmeg/jsonschemagraph v0.0.0-20240326192049-ba75b88572d3
github.com/cockroachdb/pebble v0.0.0-20220311224846-910ce60578df
github.com/dgraph-io/badger/v2 v2.2007.4
github.com/ghodss/yaml v1.0.0
Expand All @@ -18,11 +24,13 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/uuid v1.3.0
github.com/linkedin/goavro/v2 v2.10.0
github.com/lmittmann/tint v1.0.4
github.com/mattn/go-sqlite3 v1.14.16
github.com/rdleal/intervalst v1.3.0
github.com/santhosh-tekuri/jsonschema/v5 v5.3.0
github.com/spf13/cobra v1.6.1
google.golang.org/grpc v1.56.3
google.golang.org/protobuf v1.30.0
google.golang.org/protobuf v1.33.0
sigs.k8s.io/yaml v1.3.0
vitess.io/vitess v0.16.2
)
Expand All @@ -45,15 +53,15 @@ require (
github.com/cockroachdb/redact v1.0.8 // indirect
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.1.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.5.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-hclog v1.4.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand All @@ -64,10 +72,9 @@ require (
github.com/influxdata/tdigest v0.0.1 // indirect
github.com/jmoiron/sqlx v1.2.1-0.20190826204134-d7d95172beb5 // indirect
github.com/kennygrant/sanitize v1.2.4 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/klauspost/compress v1.16.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lmittmann/tint v1.0.4 // indirect
github.com/logrusorgru/aurora v0.0.0-20190428105938-cea283e61946 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
Expand All @@ -82,11 +89,10 @@ require (
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.39.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/rdleal/intervalst v1.3.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.4.0 // indirect
github.com/segmentio/ksuid v1.0.2 // indirect
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tinylib/msgp v1.1.8 // indirect
Expand All @@ -95,13 +101,13 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go4.org/intern v0.0.0-20220617035311-6925f38cc365 // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20230131160201-f062dba9d201 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.6.0 // indirect
Expand All @@ -112,9 +118,3 @@ require (
inet.af/netaddr v0.0.0-20220811202034-502d2d690317 // indirect
k8s.io/apimachinery v0.26.1 // indirect
)

replace github.com/santhosh-tekuri/jsonschema/v5 v5.3.0 => github.com/bmeg/jsonschema/v5 v5.3.3

go 1.21

toolchain go1.21.5
Loading

0 comments on commit 68125a4

Please sign in to comment.