From a8a8117e706c6adff5067309997eacd0ba13db72 Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 08:42:43 -0800 Subject: [PATCH 01/19] Renamed files Signed-off-by: thatInfrastructureGuy --- getter.go => gzip.go | 0 sort.go => s3_sort_completed_parts.go | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename getter.go => gzip.go (100%) rename sort.go => s3_sort_completed_parts.go (100%) diff --git a/getter.go b/gzip.go similarity index 100% rename from getter.go rename to gzip.go diff --git a/sort.go b/s3_sort_completed_parts.go similarity index 100% rename from sort.go rename to s3_sort_completed_parts.go From 595943564085ab56bfa348dfd1b80cb9b72011ec Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 12:07:51 -0800 Subject: [PATCH 02/19] Refactor each step into a separate goroutine Signed-off-by: thatInfrastructureGuy --- config.go | 3 +- csv.go | 94 ++++++++++++++++++++--------------------- gzip.go | 48 +++++++++++++++++++++ preprocess.go | 76 ++++++++++++++++++++++++++++++++++ sqltocsvgzip.go | 108 +++++++++--------------------------------------- 5 files changed, 193 insertions(+), 136 deletions(-) create mode 100644 preprocess.go diff --git a/config.go b/config.go index 5b70b3e..7a8edc9 100644 --- a/config.go +++ b/config.go @@ -49,6 +49,7 @@ type Converter struct { UploadThreads int UploadPartSize int RowCount int64 + Error error s3Svc *s3.S3 s3Resp *s3.CreateMultipartUploadOutput @@ -58,7 +59,7 @@ type Converter struct { gzipBuf []byte partNumber int64 uploadQ chan *obj - quit chan bool + quit chan error } // CsvPreprocessorFunc is a function type for preprocessing your CSV. diff --git a/csv.go b/csv.go index 02fb946..1b3cf44 100644 --- a/csv.go +++ b/csv.go @@ -4,10 +4,13 @@ import ( "bytes" "encoding/csv" "fmt" - "strconv" - "time" ) +type csvBuf struct { + data []byte + lastPart bool +} + func (c *Converter) getCSVWriter() (*csv.Writer, *bytes.Buffer) { // Same size as sqlRowBatch csvBuffer := bytes.NewBuffer(make([]byte, 0, c.CsvBufferSize)) @@ -23,11 +26,11 @@ func (c *Converter) getCSVWriter() (*csv.Writer, *bytes.Buffer) { return csvWriter, csvBuffer } -func (c *Converter) setCSVHeaders(csvWriter *csv.Writer) ([]string, int, error) { +func (c *Converter) setCSVHeaders(csvWriter *csv.Writer) ([]string, error) { var headers []string columnNames, err := c.rows.Columns() if err != nil { - return nil, 0, err + return nil, err } if c.WriteHeaders { @@ -43,60 +46,57 @@ func (c *Converter) setCSVHeaders(csvWriter *csv.Writer) ([]string, int, error) // Write to CSV Buffer err = csvWriter.Write(headers) if err != nil { - return nil, 0, err + return nil, err } csvWriter.Flush() - return headers, len(headers), nil + return headers, nil } -func (c *Converter) stringify(values []interface{}) []string { - row := make([]string, len(values), len(values)) +func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf) { + csvWriter, csvBuffer := c.getCSVWriter() + // Set headers + columnNames, err := c.setCSVHeaders(csvWriter) + if err != nil { + close(toGzip) + c.Error = fmt.Errorf("Error setting CSV Headers: ", err) + return + } - for i, rawValue := range values { - if rawValue == nil { - row[i] = "" - continue - } + toCSV <- columnNames - byteArray, ok := rawValue.([]byte) - if ok { - rawValue = string(byteArray) + for row := range toCSV { + c.RowCount = c.RowCount + 1 + + // Write to CSV Buffer + err = csvWriter.Write(row) + if err != nil { + close(toGzip) + c.Error = fmt.Errorf("Error writing to csv buffer: ", err) + return } + csvWriter.Flush() - switch castValue := rawValue.(type) { - case time.Time: - if c.TimeFormat != "" { - row[i] = castValue.Format(c.TimeFormat) + // Convert from csv to gzip + if csvBuffer.Len() >= (c.GzipBatchPerGoroutine * c.GzipGoroutines) { + toGzip <- &csvBuf{ + data: csvBuffer.Bytes(), + lastPart: false, } - case bool: - row[i] = strconv.FormatBool(castValue) - case string: - row[i] = castValue - case int: - row[i] = strconv.FormatInt(int64(castValue), 10) - case int8: - row[i] = strconv.FormatInt(int64(castValue), 10) - case int16: - row[i] = strconv.FormatInt(int64(castValue), 10) - case int32: - row[i] = strconv.FormatInt(int64(castValue), 10) - case int64: - row[i] = strconv.FormatInt(int64(castValue), 10) - case uint: - row[i] = strconv.FormatUint(uint64(castValue), 10) - case uint8: - row[i] = strconv.FormatUint(uint64(castValue), 10) - case uint16: - row[i] = strconv.FormatUint(uint64(castValue), 10) - case uint32: - row[i] = strconv.FormatUint(uint64(castValue), 10) - case uint64: - row[i] = strconv.FormatUint(uint64(castValue), 10) - default: - row[i] = fmt.Sprintf("%v", castValue) + + // Reset buffer + csvBuffer.Reset() } } - return row + // Flush remaining buffer contents to gzip + toGzip <- &csvBuf{ + data: csvBuffer.Bytes(), + lastPart: true, + } + + // Reset buffer + csvBuffer.Reset() + + close(toGzip) } diff --git a/gzip.go b/gzip.go index 7c9a282..afb95df 100644 --- a/gzip.go +++ b/gzip.go @@ -1,6 +1,8 @@ package sqltocsvgzip import ( + "bytes" + "fmt" "io" "github.com/klauspost/pgzip" @@ -15,3 +17,49 @@ func (c *Converter) getGzipWriter(writer io.Writer) (*pgzip.Writer, error) { err = zw.SetConcurrency(c.GzipBatchPerGoroutine, c.GzipGoroutines) return zw, err } + +func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer) { + zw, err := c.getGzipWriter(w) + if err != nil { + c.Error = fmt.Errorf("Error creating gzip writer: ", err) + return + } + defer zw.Close() + + for csvBuf := range toGzip { + _, err = zw.Write(csvBuf.data) + if err != nil { + c.Error = fmt.Errorf("Error writing to gzip buffer: ", err) + return + } + err = zw.Flush() + if err != nil { + c.Error = fmt.Errorf("Error flushing contents to gzip writer: ", err) + return + } + + // Upload partially created file to S3 + // If size of the gzip file exceeds maxFileStorage + if c.S3Upload { + // GZIP writer to underline file.csv.gzip + gzipBuffer, ok := w.(*bytes.Buffer) + if !ok { + c.Error = fmt.Errorf("Expected buffer. Got %T", w) + return + } + + if csvBuf.lastPart || gzipBuffer.Len() >= c.UploadPartSize { + if c.partNumber == 10000 { + c.Error = fmt.Errorf("Number of parts cannot exceed 10000. Please increase UploadPartSize and try again.") + return + } + + // Add to Queue + c.AddToQueue(gzipBuffer, csvBuf.lastPart) + + //Reset writer + gzipBuffer.Reset() + } + } + } +} diff --git a/preprocess.go b/preprocess.go new file mode 100644 index 0000000..ec2d38b --- /dev/null +++ b/preprocess.go @@ -0,0 +1,76 @@ +package sqltocsvgzip + +import ( + "fmt" + "strconv" + "time" +) + +func (c *Converter) stringify(values []interface{}) []string { + row := make([]string, len(values), len(values)) + + for i, rawValue := range values { + if rawValue == nil { + row[i] = "" + continue + } + + byteArray, ok := rawValue.([]byte) + if ok { + rawValue = string(byteArray) + } + + switch castValue := rawValue.(type) { + case time.Time: + if c.TimeFormat != "" { + row[i] = castValue.Format(c.TimeFormat) + } + case bool: + row[i] = strconv.FormatBool(castValue) + case string: + row[i] = castValue + case int: + row[i] = strconv.FormatInt(int64(castValue), 10) + case int8: + row[i] = strconv.FormatInt(int64(castValue), 10) + case int16: + row[i] = strconv.FormatInt(int64(castValue), 10) + case int32: + row[i] = strconv.FormatInt(int64(castValue), 10) + case int64: + row[i] = strconv.FormatInt(int64(castValue), 10) + case uint: + row[i] = strconv.FormatUint(uint64(castValue), 10) + case uint8: + row[i] = strconv.FormatUint(uint64(castValue), 10) + case uint16: + row[i] = strconv.FormatUint(uint64(castValue), 10) + case uint32: + row[i] = strconv.FormatUint(uint64(castValue), 10) + case uint64: + row[i] = strconv.FormatUint(uint64(castValue), 10) + default: + row[i] = fmt.Sprintf("%v", castValue) + } + } + + return row +} + +func (c *Converter) preProcessRows(toPreprocess chan []interface{}, columnNames []string, toCSV chan []string) { + var writeRow bool + + for values := range toPreprocess { + row := c.stringify(values) + + if c.rowPreProcessor != nil { + writeRow, row = c.rowPreProcessor(row, columnNames) + } + + if writeRow { + toCSV <- row + } + } + + close(toCSV) +} diff --git a/sqltocsvgzip.go b/sqltocsvgzip.go index 0978f39..02a278d 100644 --- a/sqltocsvgzip.go +++ b/sqltocsvgzip.go @@ -55,7 +55,7 @@ func (c *Converter) Upload() error { wg := sync.WaitGroup{} buf := bytes.Buffer{} c.uploadQ = make(chan *obj, c.UploadThreads) - c.quit = make(chan bool, 1) + c.quit = make(chan error, 1) // Upload Parts to S3 for i := 0; i < c.UploadThreads; i++ { @@ -139,43 +139,40 @@ func (c *Converter) WriteFile(csvGzipFileName string) error { } // Write writes the csv.gzip to the Writer provided -func (c *Converter) Write(w io.Writer) error { - writeRow := true +func (c *Converter) Write(w io.Writer) (err error) { interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) defer signal.Stop(interrupt) - csvWriter, csvBuffer := c.getCSVWriter() + toPreprocess := make(chan []interface{}) + toCSV := make(chan []string) + toGzip := make(chan *csvBuf) - // Set headers - columnNames, totalColumns, err := c.setCSVHeaders(csvWriter) - if err != nil { - return err - } + go c.rowToCSV(toCSV, toGzip) + columnNames := <-toCSV + + go c.preProcessRows(toPreprocess, columnNames, toCSV) + go c.csvToGzip(toGzip, w) // Buffers for each iteration - values := make([]interface{}, totalColumns, totalColumns) - valuePtrs := make([]interface{}, totalColumns, totalColumns) + values := make([]interface{}, len(columnNames), len(columnNames)) + valuePtrs := make([]interface{}, len(columnNames), len(columnNames)) for i := range columnNames { valuePtrs[i] = &values[i] } - zw, err := c.getGzipWriter(w) - if err != nil { - return err - } - defer zw.Close() - // Iterate over sql rows for c.rows.Next() { select { - case <-c.quit: + case err := <-c.quit: + close(toPreprocess) c.abortMultipartUpload() - return fmt.Errorf("Received quit signal. Exiting.") + return fmt.Errorf("Error received: %v\n", err) case <-interrupt: + close(toPreprocess) c.abortMultipartUpload() - return fmt.Errorf("Received quit signal. Exiting.") + return fmt.Errorf("Received os interrupt signal. Exiting.") default: // Do nothing } @@ -184,60 +181,7 @@ func (c *Converter) Write(w io.Writer) error { return err } - row := c.stringify(values) - - if c.rowPreProcessor != nil { - writeRow, row = c.rowPreProcessor(row, columnNames) - } - - if writeRow { - c.RowCount = c.RowCount + 1 - - // Write to CSV Buffer - err = csvWriter.Write(row) - if err != nil { - return err - } - csvWriter.Flush() - - // Convert from csv to gzip - // Writes from buffer to underlying file - if csvBuffer.Len() >= (c.GzipBatchPerGoroutine * c.GzipGoroutines) { - _, err = zw.Write(csvBuffer.Bytes()) - if err != nil { - return err - } - err = zw.Flush() - if err != nil { - return err - } - - // Reset buffer - csvBuffer.Reset() - - // Upload partially created file to S3 - // If size of the gzip file exceeds maxFileStorage - if c.S3Upload { - // GZIP writer to underline file.csv.gzip - gzipBuffer, ok := w.(*bytes.Buffer) - if !ok { - return fmt.Errorf("Expected buffer. Got %T", w) - } - - if gzipBuffer.Len() >= c.UploadPartSize { - if c.partNumber == 10000 { - return fmt.Errorf("Number of parts cannot exceed 10000. Please increase UploadPartSize and try again.") - } - - // Add to Queue - c.AddToQueue(gzipBuffer, false) - - //Reset writer - gzipBuffer.Reset() - } - } - } - } + toPreprocess <- values } err = c.rows.Err() @@ -245,17 +189,7 @@ func (c *Converter) Write(w io.Writer) error { return err } - _, err = zw.Write(csvBuffer.Bytes()) - if err != nil { - return err - } - err = zw.Close() - if err != nil { - return err - } - - //Wipe the buffer - csvBuffer.Reset() + close(toPreprocess) // Upload last part of the file to S3 if c.S3Upload { @@ -333,9 +267,7 @@ func (c *Converter) UploadPart() (err error) { for s3obj := range c.uploadQ { err = c.uploadPart(s3obj.partNumber, s3obj.buf, mu) if err != nil { - c.writeLog(Error, "Error occurred. Sending quit signal to writer.") - c.quit <- true - c.abortMultipartUpload() + c.quit <- fmt.Errorf("Error uploading part: %v\n", err) return err } } From df58750736e2d3c6b541aac84a8b865cfdcdc927 Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 12:49:47 -0800 Subject: [PATCH 03/19] WriteRow is set to true. Signed-off-by: thatInfrastructureGuy --- gzip.go | 14 +++++++------- preprocess.go | 2 +- sqltocsvgzip.go | 19 ++++--------------- 3 files changed, 12 insertions(+), 23 deletions(-) diff --git a/gzip.go b/gzip.go index afb95df..6c0c7b1 100644 --- a/gzip.go +++ b/gzip.go @@ -19,6 +19,13 @@ func (c *Converter) getGzipWriter(writer io.Writer) (*pgzip.Writer, error) { } func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer) { + gzipBuffer, ok := w.(*bytes.Buffer) + if !ok { + c.Error = fmt.Errorf("Expected buffer. Got %T", w) + return + } + + // GZIP writer to underline file.csv.gzip zw, err := c.getGzipWriter(w) if err != nil { c.Error = fmt.Errorf("Error creating gzip writer: ", err) @@ -41,13 +48,6 @@ func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer) { // Upload partially created file to S3 // If size of the gzip file exceeds maxFileStorage if c.S3Upload { - // GZIP writer to underline file.csv.gzip - gzipBuffer, ok := w.(*bytes.Buffer) - if !ok { - c.Error = fmt.Errorf("Expected buffer. Got %T", w) - return - } - if csvBuf.lastPart || gzipBuffer.Len() >= c.UploadPartSize { if c.partNumber == 10000 { c.Error = fmt.Errorf("Number of parts cannot exceed 10000. Please increase UploadPartSize and try again.") diff --git a/preprocess.go b/preprocess.go index ec2d38b..653829d 100644 --- a/preprocess.go +++ b/preprocess.go @@ -58,7 +58,7 @@ func (c *Converter) stringify(values []interface{}) []string { } func (c *Converter) preProcessRows(toPreprocess chan []interface{}, columnNames []string, toCSV chan []string) { - var writeRow bool + writeRow := true for values := range toPreprocess { row := c.stringify(values) diff --git a/sqltocsvgzip.go b/sqltocsvgzip.go index 02a278d..927cefa 100644 --- a/sqltocsvgzip.go +++ b/sqltocsvgzip.go @@ -191,27 +191,16 @@ func (c *Converter) Write(w io.Writer) (err error) { close(toPreprocess) - // Upload last part of the file to S3 + // Log the total number of rows processed. + c.writeLog(Info, fmt.Sprintf("Total sql rows processed: %v", c.RowCount)) + + // Check if we need to do direct upload if c.S3Upload { if c.partNumber == 0 { return nil } - - // GZIP writer to underline file.csv.gzip - gzipBuffer, ok := w.(*bytes.Buffer) - if !ok { - return fmt.Errorf("Expected buffer. Got %T", w) - } - - // Add to Queue for multipart upload - c.AddToQueue(gzipBuffer, true) - - //Reset writer - gzipBuffer.Reset() } - // Log the total number of rows processed. - c.writeLog(Info, fmt.Sprintf("Total sql rows processed: %v", c.RowCount)) return nil } From 3e7e1e92a60b97358dc87a0cc5ef2e65ad120135 Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 16:51:48 -0800 Subject: [PATCH 04/19] closing queue after gzip have been added. Signed-off-by: thatInfrastructureGuy --- config.go | 2 ++ gzip.go | 2 ++ sqltocsvgzip.go | 18 ++++++------------ 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/config.go b/config.go index 7a8edc9..2a838cd 100644 --- a/config.go +++ b/config.go @@ -81,6 +81,7 @@ func (c *Converter) SetRowPreProcessor(processor CsvPreProcessorFunc) { func WriteConfig(rows *sql.Rows) *Converter { return &Converter{ rows: rows, + quit: make(chan error, 1), WriteHeaders: true, Delimiter: ',', CsvBufferSize: 10 * 1024 * 1024, @@ -95,6 +96,7 @@ func WriteConfig(rows *sql.Rows) *Converter { func UploadConfig(rows *sql.Rows) *Converter { return &Converter{ rows: rows, + quit: make(chan error, 1), WriteHeaders: true, Delimiter: ',', CompressionLevel: flate.DefaultCompression, diff --git a/gzip.go b/gzip.go index 6c0c7b1..e88856b 100644 --- a/gzip.go +++ b/gzip.go @@ -61,5 +61,7 @@ func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer) { gzipBuffer.Reset() } } + close(c.uploadQ) } + } diff --git a/sqltocsvgzip.go b/sqltocsvgzip.go index 927cefa..f8c9228 100644 --- a/sqltocsvgzip.go +++ b/sqltocsvgzip.go @@ -52,23 +52,19 @@ func (c *Converter) Upload() error { return err } - wg := sync.WaitGroup{} - buf := bytes.Buffer{} c.uploadQ = make(chan *obj, c.UploadThreads) - c.quit = make(chan error, 1) + wg := sync.WaitGroup{} // Upload Parts to S3 for i := 0; i < c.UploadThreads; i++ { wg.Add(1) go func() { defer wg.Done() - err = c.UploadPart() - if err != nil { - c.writeLog(Error, err.Error()) - } + c.UploadPart() }() } + buf := bytes.Buffer{} err = c.Write(&buf) if err != nil { // Abort S3 Upload @@ -79,7 +75,6 @@ func (c *Converter) Upload() error { return err } - close(c.uploadQ) wg.Wait() if c.partNumber == 0 { @@ -251,17 +246,16 @@ func (c *Converter) AddToQueue(buf *bytes.Buffer, lastPart bool) { // UploadPart listens to upload queue. Whenever an obj is received, // it is then uploaded to AWS. // Abort operation is called if any error is received. -func (c *Converter) UploadPart() (err error) { +func (c *Converter) UploadPart() { mu := &sync.RWMutex{} for s3obj := range c.uploadQ { - err = c.uploadPart(s3obj.partNumber, s3obj.buf, mu) + err := c.uploadPart(s3obj.partNumber, s3obj.buf, mu) if err != nil { c.quit <- fmt.Errorf("Error uploading part: %v\n", err) - return err + return } } c.writeLog(Debug, "Received closed signal") - return } // writeLog decides whether to write a log to stdout depending on LogLevel. From 3864897fda2ee28bea73a40e526aa12073d1f588 Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 17:01:03 -0800 Subject: [PATCH 05/19] Close upload channel after upload. Signed-off-by: thatInfrastructureGuy --- gzip.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gzip.go b/gzip.go index e88856b..fe39cbe 100644 --- a/gzip.go +++ b/gzip.go @@ -61,7 +61,10 @@ func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer) { gzipBuffer.Reset() } } - close(c.uploadQ) } + // Close channel. + if c.S3Upload { + close(c.uploadQ) + } } From 0a7d387abdf5d1e0210d0cddb4569f62a316489c Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 17:01:48 -0800 Subject: [PATCH 06/19] Amend comment. Signed-off-by: thatInfrastructureGuy --- gzip.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gzip.go b/gzip.go index fe39cbe..ff29bcf 100644 --- a/gzip.go +++ b/gzip.go @@ -63,7 +63,7 @@ func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer) { } } - // Close channel. + // Close channel after sending complete. if c.S3Upload { close(c.uploadQ) } From 4d7c6fc4dd314afe31ba97a8fcb5cd52b847fd2b Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 17:37:11 -0800 Subject: [PATCH 07/19] Close gzip writer after done. Signed-off-by: thatInfrastructureGuy --- gzip.go | 9 +++++++++ sqltocsvgzip.go | 8 -------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/gzip.go b/gzip.go index ff29bcf..d388bc1 100644 --- a/gzip.go +++ b/gzip.go @@ -39,12 +39,21 @@ func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer) { c.Error = fmt.Errorf("Error writing to gzip buffer: ", err) return } + err = zw.Flush() if err != nil { c.Error = fmt.Errorf("Error flushing contents to gzip writer: ", err) return } + if csvBuf.lastPart { + err = zw.Close() + if err != nil { + c.Error = fmt.Errorf("Error closing gzip writer: ", err) + return + } + } + // Upload partially created file to S3 // If size of the gzip file exceeds maxFileStorage if c.S3Upload { diff --git a/sqltocsvgzip.go b/sqltocsvgzip.go index f8c9228..1d21f48 100644 --- a/sqltocsvgzip.go +++ b/sqltocsvgzip.go @@ -188,14 +188,6 @@ func (c *Converter) Write(w io.Writer) (err error) { // Log the total number of rows processed. c.writeLog(Info, fmt.Sprintf("Total sql rows processed: %v", c.RowCount)) - - // Check if we need to do direct upload - if c.S3Upload { - if c.partNumber == 0 { - return nil - } - } - return nil } From 6f935d57c2061c794e5d1d68f93b4f1864e58048 Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 17:57:51 -0800 Subject: [PATCH 08/19] Use env var LOG_LEVEL Signed-off-by: thatInfrastructureGuy --- config.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/config.go b/config.go index 2a838cd..3afedea 100644 --- a/config.go +++ b/config.go @@ -75,6 +75,23 @@ func (c *Converter) SetRowPreProcessor(processor CsvPreProcessorFunc) { c.rowPreProcessor = processor } +func getLogLevel() (level LogLevel) { + levels := map[string]LogLevel{ + "ERROR": Error, + "WARN": Warn, + "INFO": Info, + "DEBUG": Debug, + "VERBOSE": Verbose, + } + + var ok bool + if level, ok = levels[os.Getenv("LOG_LEVEL")]; !ok { + level = Info + } + + return +} + // WriteConfig will return a Converter which will write your CSV however you like // but will allow you to set a bunch of non-default behaivour like overriding // headers or injecting a pre-processing step into your conversion @@ -88,7 +105,7 @@ func WriteConfig(rows *sql.Rows) *Converter { CompressionLevel: flate.DefaultCompression, GzipGoroutines: runtime.GOMAXPROCS(0), // Should be atleast the number of cores. Not sure how it impacts cgroup limits. GzipBatchPerGoroutine: 512 * 1024, // Should be atleast 100K - LogLevel: Info, + LogLevel: getLogLevel(), } } @@ -103,7 +120,7 @@ func UploadConfig(rows *sql.Rows) *Converter { CsvBufferSize: 10 * 1024 * 1024, GzipGoroutines: runtime.GOMAXPROCS(0), // Should be atleast the number of cores. Not sure how it impacts cgroup limits. GzipBatchPerGoroutine: 512 * 1024, // Should be atleast 100K - LogLevel: Info, + LogLevel: getLogLevel(), S3Upload: true, UploadThreads: 4, UploadPartSize: 50 * 1024 * 1025, // Should be greater than 5 * 1024 * 1024 for s3 upload From 1d30a6679172ec2a3568df793228b553a26181c9 Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 18:20:51 -0800 Subject: [PATCH 09/19] Return Row Count Signed-off-by: thatInfrastructureGuy --- sqltocsvgzip.go | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/sqltocsvgzip.go b/sqltocsvgzip.go index 1d21f48..f657440 100644 --- a/sqltocsvgzip.go +++ b/sqltocsvgzip.go @@ -19,7 +19,7 @@ import ( // WriteFile will write a CSV.GZIP file to the file name specified (with headers) // based on whatever is in the sql.Rows you pass in. -func WriteFile(csvGzipFileName string, rows *sql.Rows) error { +func WriteFile(csvGzipFileName string, rows *sql.Rows) (rowCount int64, err error) { return WriteConfig(rows).WriteFile(csvGzipFileName) } @@ -28,7 +28,7 @@ func WriteFile(csvGzipFileName string, rows *sql.Rows) error { // UploadToS3 looks for the following environment variables. // Required: S3_BUCKET, S3_PATH, S3_REGION // Optional: S3_ACL (default => bucket-owner-full-control) -func UploadToS3(rows *sql.Rows) error { +func UploadToS3(rows *sql.Rows) (rowCount int64, err error) { return UploadConfig(rows).Upload() } @@ -36,20 +36,20 @@ func UploadToS3(rows *sql.Rows) error { // Creates a Multipart AWS requests. // Completes the multipart request if all uploads are successful. // Aborts the operation when an error is received. -func (c *Converter) Upload() error { +func (c *Converter) Upload() (rowCount int64, err error) { if c.UploadPartSize < minFileSize { - return fmt.Errorf("UploadPartSize should be greater than %v\n", minFileSize) + return 0, fmt.Errorf("UploadPartSize should be greater than %v\n", minFileSize) } // Create MultiPart S3 Upload - err := c.createS3Session() + err = c.createS3Session() if err != nil { - return err + return 0, err } err = c.createMultipartRequest() if err != nil { - return err + return 0, err } c.uploadQ = make(chan *obj, c.UploadThreads) @@ -70,9 +70,9 @@ func (c *Converter) Upload() error { // Abort S3 Upload awserr := c.abortMultipartUpload() if awserr != nil { - return awserr + return 0, awserr } - return err + return 0, err } wg.Wait() @@ -82,14 +82,14 @@ func (c *Converter) Upload() error { c.writeLog(Info, "Gzip file < 5 MB. Enable direct upload. Abort multipart upload.") err = c.abortMultipartUpload() if err != nil { - return err + return 0, err } err = c.UploadObjectToS3(&buf) if err != nil { - return err + return 0, err } - return nil + return c.RowCount, nil } // Sort completed parts @@ -100,25 +100,25 @@ func (c *Converter) Upload() error { // Abort S3 Upload awserr := c.abortMultipartUpload() if awserr != nil { - return awserr + return 0, awserr } - return err + return 0, err } uploadPath, err := url.PathUnescape(completeResponse.String()) if err != nil { - return err + return 0, err } c.writeLog(Info, "Successfully uploaded file: "+uploadPath) - return nil + return c.RowCount, nil } // WriteFile writes the csv.gzip to the filename specified, return an error if problem -func (c *Converter) WriteFile(csvGzipFileName string) error { +func (c *Converter) WriteFile(csvGzipFileName string) (rowCount int64, err error) { f, err := os.Create(csvGzipFileName) if err != nil { - return err + return 0, err } defer f.Close() @@ -127,10 +127,10 @@ func (c *Converter) WriteFile(csvGzipFileName string) error { err = c.Write(f) if err != nil { - return err + return 0, err } - return nil + return c.RowCount, nil } // Write writes the csv.gzip to the Writer provided From 22aba36ddfa1072a89f86e48bc21f92f2565b2a1 Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 18:50:18 -0800 Subject: [PATCH 10/19] Add debug logs. Signed-off-by: thatInfrastructureGuy --- config.go | 1 - csv.go | 6 ++++-- gzip.go | 21 ++++++++------------- preprocess.go | 1 + sqltocsvgzip.go | 1 + 5 files changed, 14 insertions(+), 16 deletions(-) diff --git a/config.go b/config.go index 3afedea..e83a61f 100644 --- a/config.go +++ b/config.go @@ -49,7 +49,6 @@ type Converter struct { UploadThreads int UploadPartSize int RowCount int64 - Error error s3Svc *s3.S3 s3Resp *s3.CreateMultipartUploadOutput diff --git a/csv.go b/csv.go index 1b3cf44..884b0e1 100644 --- a/csv.go +++ b/csv.go @@ -59,7 +59,7 @@ func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf) { columnNames, err := c.setCSVHeaders(csvWriter) if err != nil { close(toGzip) - c.Error = fmt.Errorf("Error setting CSV Headers: ", err) + c.quit <- fmt.Errorf("Error setting CSV Headers: ", err) return } @@ -72,13 +72,14 @@ func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf) { err = csvWriter.Write(row) if err != nil { close(toGzip) - c.Error = fmt.Errorf("Error writing to csv buffer: ", err) + c.quit <- fmt.Errorf("Error writing to csv buffer: ", err) return } csvWriter.Flush() // Convert from csv to gzip if csvBuffer.Len() >= (c.GzipBatchPerGoroutine * c.GzipGoroutines) { + fmt.Println("[DEBUG] csvBuffer Len:", csvBuffer.Len()) toGzip <- &csvBuf{ data: csvBuffer.Bytes(), lastPart: false, @@ -89,6 +90,7 @@ func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf) { } } + fmt.Println("[DEBUG] csvBuffer Len:", csvBuffer.Len()) // Flush remaining buffer contents to gzip toGzip <- &csvBuf{ data: csvBuffer.Bytes(), diff --git a/gzip.go b/gzip.go index d388bc1..9ccc9a1 100644 --- a/gzip.go +++ b/gzip.go @@ -21,45 +21,40 @@ func (c *Converter) getGzipWriter(writer io.Writer) (*pgzip.Writer, error) { func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer) { gzipBuffer, ok := w.(*bytes.Buffer) if !ok { - c.Error = fmt.Errorf("Expected buffer. Got %T", w) + c.quit <- fmt.Errorf("Expected buffer. Got %T", w) return } // GZIP writer to underline file.csv.gzip zw, err := c.getGzipWriter(w) if err != nil { - c.Error = fmt.Errorf("Error creating gzip writer: ", err) + c.quit <- fmt.Errorf("Error creating gzip writer: ", err) return } defer zw.Close() for csvBuf := range toGzip { - _, err = zw.Write(csvBuf.data) + n, err := zw.Write(csvBuf.data) + fmt.Println("[DEBUG] zw written:", n) if err != nil { - c.Error = fmt.Errorf("Error writing to gzip buffer: ", err) + c.quit <- fmt.Errorf("Error writing to gzip buffer: ", err) return } err = zw.Flush() if err != nil { - c.Error = fmt.Errorf("Error flushing contents to gzip writer: ", err) + c.quit <- fmt.Errorf("Error flushing contents to gzip writer: ", err) return } - if csvBuf.lastPart { - err = zw.Close() - if err != nil { - c.Error = fmt.Errorf("Error closing gzip writer: ", err) - return - } - } + fmt.Println("[DEBUG] zw flushed:", zw.UncompressedSize()) // Upload partially created file to S3 // If size of the gzip file exceeds maxFileStorage if c.S3Upload { if csvBuf.lastPart || gzipBuffer.Len() >= c.UploadPartSize { if c.partNumber == 10000 { - c.Error = fmt.Errorf("Number of parts cannot exceed 10000. Please increase UploadPartSize and try again.") + c.quit <- fmt.Errorf("Number of parts cannot exceed 10000. Please increase UploadPartSize and try again.") return } diff --git a/preprocess.go b/preprocess.go index 653829d..7f84abc 100644 --- a/preprocess.go +++ b/preprocess.go @@ -68,6 +68,7 @@ func (c *Converter) preProcessRows(toPreprocess chan []interface{}, columnNames } if writeRow { + fmt.Println("[DEBUG] row:", row) toCSV <- row } } diff --git a/sqltocsvgzip.go b/sqltocsvgzip.go index f657440..a693a25 100644 --- a/sqltocsvgzip.go +++ b/sqltocsvgzip.go @@ -176,6 +176,7 @@ func (c *Converter) Write(w io.Writer) (err error) { return err } + fmt.Println("[DEBUG] Sending row to preprocess: ", c.RowCount) toPreprocess <- values } From e2b61cfe5870fefe2e7d962f0f0c505221e2e366 Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 19:09:56 -0800 Subject: [PATCH 11/19] Use gzip buffer only for s3 upload. Signed-off-by: thatInfrastructureGuy --- gzip.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/gzip.go b/gzip.go index 9ccc9a1..f38f768 100644 --- a/gzip.go +++ b/gzip.go @@ -19,10 +19,14 @@ func (c *Converter) getGzipWriter(writer io.Writer) (*pgzip.Writer, error) { } func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer) { - gzipBuffer, ok := w.(*bytes.Buffer) - if !ok { - c.quit <- fmt.Errorf("Expected buffer. Got %T", w) - return + var gzipBuffer *bytes.Buffer + if c.S3Upload { + var ok bool + gzipBuffer, ok = w.(*bytes.Buffer) + if !ok { + c.quit <- fmt.Errorf("Expected buffer. Got %T", w) + return + } } // GZIP writer to underline file.csv.gzip @@ -36,6 +40,7 @@ func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer) { for csvBuf := range toGzip { n, err := zw.Write(csvBuf.data) fmt.Println("[DEBUG] zw written:", n) + fmt.Println("[DEBUG] zw flushed:", zw.UncompressedSize()) if err != nil { c.quit <- fmt.Errorf("Error writing to gzip buffer: ", err) return @@ -47,8 +52,6 @@ func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer) { return } - fmt.Println("[DEBUG] zw flushed:", zw.UncompressedSize()) - // Upload partially created file to S3 // If size of the gzip file exceeds maxFileStorage if c.S3Upload { From 668ce48ffb05a6a001e9ba2f4ed6358a4822567c Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 19:20:46 -0800 Subject: [PATCH 12/19] Remove debug log. Signed-off-by: thatInfrastructureGuy --- csv.go | 2 -- gzip.go | 4 +--- preprocess.go | 1 - sqltocsvgzip.go | 1 - 4 files changed, 1 insertion(+), 7 deletions(-) diff --git a/csv.go b/csv.go index 884b0e1..c25c007 100644 --- a/csv.go +++ b/csv.go @@ -79,7 +79,6 @@ func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf) { // Convert from csv to gzip if csvBuffer.Len() >= (c.GzipBatchPerGoroutine * c.GzipGoroutines) { - fmt.Println("[DEBUG] csvBuffer Len:", csvBuffer.Len()) toGzip <- &csvBuf{ data: csvBuffer.Bytes(), lastPart: false, @@ -90,7 +89,6 @@ func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf) { } } - fmt.Println("[DEBUG] csvBuffer Len:", csvBuffer.Len()) // Flush remaining buffer contents to gzip toGzip <- &csvBuf{ data: csvBuffer.Bytes(), diff --git a/gzip.go b/gzip.go index f38f768..2653b92 100644 --- a/gzip.go +++ b/gzip.go @@ -38,9 +38,7 @@ func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer) { defer zw.Close() for csvBuf := range toGzip { - n, err := zw.Write(csvBuf.data) - fmt.Println("[DEBUG] zw written:", n) - fmt.Println("[DEBUG] zw flushed:", zw.UncompressedSize()) + _, err = zw.Write(csvBuf.data) if err != nil { c.quit <- fmt.Errorf("Error writing to gzip buffer: ", err) return diff --git a/preprocess.go b/preprocess.go index 7f84abc..653829d 100644 --- a/preprocess.go +++ b/preprocess.go @@ -68,7 +68,6 @@ func (c *Converter) preProcessRows(toPreprocess chan []interface{}, columnNames } if writeRow { - fmt.Println("[DEBUG] row:", row) toCSV <- row } } diff --git a/sqltocsvgzip.go b/sqltocsvgzip.go index a693a25..f657440 100644 --- a/sqltocsvgzip.go +++ b/sqltocsvgzip.go @@ -176,7 +176,6 @@ func (c *Converter) Write(w io.Writer) (err error) { return err } - fmt.Println("[DEBUG] Sending row to preprocess: ", c.RowCount) toPreprocess <- values } From d0e4239f7d36f5fd7262b3321cd15f01cbc6c4a9 Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 19:27:45 -0800 Subject: [PATCH 13/19] Close gzip writer when last part is encountered. Signed-off-by: thatInfrastructureGuy --- gzip.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/gzip.go b/gzip.go index 2653b92..6d5bbbf 100644 --- a/gzip.go +++ b/gzip.go @@ -50,6 +50,14 @@ func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer) { return } + if csvBuf.lastPart { + err = zw.Close() + if err != nil { + c.quit <- fmt.Errorf("Error flushing contents to gzip writer: ", err) + return + } + } + // Upload partially created file to S3 // If size of the gzip file exceeds maxFileStorage if c.S3Upload { From 622385b3dca3304cbe30b35ff152e9a8ab892bb9 Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 19:56:37 -0800 Subject: [PATCH 14/19] Wait for goroutines to finish Signed-off-by: thatInfrastructureGuy --- csv.go | 4 +++- gzip.go | 4 +++- preprocess.go | 4 +++- sqltocsvgzip.go | 14 +++++++++----- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/csv.go b/csv.go index c25c007..79dbf80 100644 --- a/csv.go +++ b/csv.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/csv" "fmt" + "sync" ) type csvBuf struct { @@ -53,7 +54,8 @@ func (c *Converter) setCSVHeaders(csvWriter *csv.Writer) ([]string, error) { return headers, nil } -func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf) { +func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf, wg *sync.WaitGroup) { + defer wg.Done() csvWriter, csvBuffer := c.getCSVWriter() // Set headers columnNames, err := c.setCSVHeaders(csvWriter) diff --git a/gzip.go b/gzip.go index 6d5bbbf..0b97b8c 100644 --- a/gzip.go +++ b/gzip.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io" + "sync" "github.com/klauspost/pgzip" ) @@ -18,7 +19,8 @@ func (c *Converter) getGzipWriter(writer io.Writer) (*pgzip.Writer, error) { return zw, err } -func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer) { +func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer, wg *sync.WaitGroup) { + defer wg.Done() var gzipBuffer *bytes.Buffer if c.S3Upload { var ok bool diff --git a/preprocess.go b/preprocess.go index 653829d..285ebcc 100644 --- a/preprocess.go +++ b/preprocess.go @@ -3,6 +3,7 @@ package sqltocsvgzip import ( "fmt" "strconv" + "sync" "time" ) @@ -57,7 +58,8 @@ func (c *Converter) stringify(values []interface{}) []string { return row } -func (c *Converter) preProcessRows(toPreprocess chan []interface{}, columnNames []string, toCSV chan []string) { +func (c *Converter) preProcessRows(toPreprocess chan []interface{}, columnNames []string, toCSV chan []string, wg *sync.WaitGroup) { + defer wg.Done() writeRow := true for values := range toPreprocess { diff --git a/sqltocsvgzip.go b/sqltocsvgzip.go index f657440..5f6a177 100644 --- a/sqltocsvgzip.go +++ b/sqltocsvgzip.go @@ -53,7 +53,7 @@ func (c *Converter) Upload() (rowCount int64, err error) { } c.uploadQ = make(chan *obj, c.UploadThreads) - wg := sync.WaitGroup{} + wg := &sync.WaitGroup{} // Upload Parts to S3 for i := 0; i < c.UploadThreads; i++ { @@ -143,11 +143,13 @@ func (c *Converter) Write(w io.Writer) (err error) { toCSV := make(chan []string) toGzip := make(chan *csvBuf) - go c.rowToCSV(toCSV, toGzip) + // Create 3 goroutines + wg := &sync.WaitGroup{} + wg.Add(3) + go c.rowToCSV(toCSV, toGzip, wg) columnNames := <-toCSV - - go c.preProcessRows(toPreprocess, columnNames, toCSV) - go c.csvToGzip(toGzip, w) + go c.preProcessRows(toPreprocess, columnNames, toCSV, wg) + go c.csvToGzip(toGzip, w, wg) // Buffers for each iteration values := make([]interface{}, len(columnNames), len(columnNames)) @@ -186,6 +188,8 @@ func (c *Converter) Write(w io.Writer) (err error) { close(toPreprocess) + wg.Wait() + // Log the total number of rows processed. c.writeLog(Info, fmt.Sprintf("Total sql rows processed: %v", c.RowCount)) return nil From f77fba683ec4712359b6f41109fbc68537c2f460 Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 20:33:17 -0800 Subject: [PATCH 15/19] Dont flush and close Signed-off-by: thatInfrastructureGuy --- gzip.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/gzip.go b/gzip.go index 0b97b8c..961456b 100644 --- a/gzip.go +++ b/gzip.go @@ -46,18 +46,18 @@ func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer, wg *sync.WaitGro return } - err = zw.Flush() - if err != nil { - c.quit <- fmt.Errorf("Error flushing contents to gzip writer: ", err) - return - } - if csvBuf.lastPart { err = zw.Close() if err != nil { c.quit <- fmt.Errorf("Error flushing contents to gzip writer: ", err) return } + } else { + err = zw.Flush() + if err != nil { + c.quit <- fmt.Errorf("Error flushing contents to gzip writer: ", err) + return + } } // Upload partially created file to S3 From c4c0da07f1b54ecdadf51c26dd71c5f74546d800 Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 23:03:17 -0800 Subject: [PATCH 16/19] Errorf args Signed-off-by: thatInfrastructureGuy --- gzip.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/gzip.go b/gzip.go index 961456b..13be031 100644 --- a/gzip.go +++ b/gzip.go @@ -34,7 +34,7 @@ func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer, wg *sync.WaitGro // GZIP writer to underline file.csv.gzip zw, err := c.getGzipWriter(w) if err != nil { - c.quit <- fmt.Errorf("Error creating gzip writer: ", err) + c.quit <- fmt.Errorf("Error creating gzip writer: %v", err) return } defer zw.Close() @@ -42,20 +42,20 @@ func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer, wg *sync.WaitGro for csvBuf := range toGzip { _, err = zw.Write(csvBuf.data) if err != nil { - c.quit <- fmt.Errorf("Error writing to gzip buffer: ", err) + c.quit <- fmt.Errorf("Error writing to gzip buffer: %v", err) return } if csvBuf.lastPart { err = zw.Close() if err != nil { - c.quit <- fmt.Errorf("Error flushing contents to gzip writer: ", err) + c.quit <- fmt.Errorf("Error flushing contents to gzip writer: %v", err) return } } else { err = zw.Flush() if err != nil { - c.quit <- fmt.Errorf("Error flushing contents to gzip writer: ", err) + c.quit <- fmt.Errorf("Error flushing contents to gzip writer: %v", err) return } } From 8f1a201faae1909242aad7cceef788c905a86b38 Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Mon, 18 Jan 2021 23:04:15 -0800 Subject: [PATCH 17/19] Errorf args Signed-off-by: thatInfrastructureGuy --- csv.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/csv.go b/csv.go index 79dbf80..3225997 100644 --- a/csv.go +++ b/csv.go @@ -61,7 +61,7 @@ func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf, wg *sync. columnNames, err := c.setCSVHeaders(csvWriter) if err != nil { close(toGzip) - c.quit <- fmt.Errorf("Error setting CSV Headers: ", err) + c.quit <- fmt.Errorf("Error setting CSV Headers: %v", err) return } @@ -74,7 +74,7 @@ func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf, wg *sync. err = csvWriter.Write(row) if err != nil { close(toGzip) - c.quit <- fmt.Errorf("Error writing to csv buffer: ", err) + c.quit <- fmt.Errorf("Error writing to csv buffer: %v", err) return } csvWriter.Flush() From ef4399f0bd49f35b3877d4c4223ff38a1c46c89d Mon Sep 17 00:00:00 2001 From: thatInfrastructureGuy Date: Sat, 30 Jan 2021 08:55:47 -0800 Subject: [PATCH 18/19] Use csvBuf value instead of pointer. Signed-off-by: thatInfrastructureGuy --- csv.go | 42 +++++++++++++++++++++--------------------- gzip.go | 2 +- sqltocsvgzip.go | 18 ++++++++++-------- 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/csv.go b/csv.go index 3225997..3b0a3d0 100644 --- a/csv.go +++ b/csv.go @@ -27,45 +27,45 @@ func (c *Converter) getCSVWriter() (*csv.Writer, *bytes.Buffer) { return csvWriter, csvBuffer } -func (c *Converter) setCSVHeaders(csvWriter *csv.Writer) ([]string, error) { - var headers []string +func (c *Converter) getCSVHeaders(csvWriter *csv.Writer) (headers []string, err error) { columnNames, err := c.rows.Columns() if err != nil { return nil, err } - if c.WriteHeaders { - // use Headers if set, otherwise default to - // query Columns - if len(c.Headers) > 0 { - headers = c.Headers - } else { - headers = columnNames - } + // use Headers if set, otherwise default to + // query Columns + if len(c.Headers) > 0 { + headers = c.Headers + } else { + headers = columnNames } // Write to CSV Buffer - err = csvWriter.Write(headers) - if err != nil { - return nil, err + if c.WriteHeaders { + err = csvWriter.Write(headers) + if err != nil { + return nil, err + } + csvWriter.Flush() } - csvWriter.Flush() - return headers, nil + return } -func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf, wg *sync.WaitGroup) { +func (c *Converter) rowToCSV(getHeaders, toCSV chan []string, toGzip chan csvBuf, wg *sync.WaitGroup) { defer wg.Done() csvWriter, csvBuffer := c.getCSVWriter() - // Set headers - columnNames, err := c.setCSVHeaders(csvWriter) + + // Get headers + columnNames, err := c.getCSVHeaders(csvWriter) if err != nil { close(toGzip) c.quit <- fmt.Errorf("Error setting CSV Headers: %v", err) return } - toCSV <- columnNames + getHeaders <- columnNames for row := range toCSV { c.RowCount = c.RowCount + 1 @@ -81,7 +81,7 @@ func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf, wg *sync. // Convert from csv to gzip if csvBuffer.Len() >= (c.GzipBatchPerGoroutine * c.GzipGoroutines) { - toGzip <- &csvBuf{ + toGzip <- csvBuf{ data: csvBuffer.Bytes(), lastPart: false, } @@ -92,7 +92,7 @@ func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf, wg *sync. } // Flush remaining buffer contents to gzip - toGzip <- &csvBuf{ + toGzip <- csvBuf{ data: csvBuffer.Bytes(), lastPart: true, } diff --git a/gzip.go b/gzip.go index 13be031..ae9c51e 100644 --- a/gzip.go +++ b/gzip.go @@ -19,7 +19,7 @@ func (c *Converter) getGzipWriter(writer io.Writer) (*pgzip.Writer, error) { return zw, err } -func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer, wg *sync.WaitGroup) { +func (c *Converter) csvToGzip(toGzip chan csvBuf, w io.Writer, wg *sync.WaitGroup) { defer wg.Done() var gzipBuffer *bytes.Buffer if c.S3Upload { diff --git a/sqltocsvgzip.go b/sqltocsvgzip.go index 5f6a177..af076bc 100644 --- a/sqltocsvgzip.go +++ b/sqltocsvgzip.go @@ -140,22 +140,24 @@ func (c *Converter) Write(w io.Writer) (err error) { defer signal.Stop(interrupt) toPreprocess := make(chan []interface{}) - toCSV := make(chan []string) - toGzip := make(chan *csvBuf) + getHeaders, toCSV := make(chan []string), make(chan []string) + toGzip := make(chan csvBuf) // Create 3 goroutines wg := &sync.WaitGroup{} wg.Add(3) - go c.rowToCSV(toCSV, toGzip, wg) - columnNames := <-toCSV - go c.preProcessRows(toPreprocess, columnNames, toCSV, wg) + + go c.rowToCSV(getHeaders, toCSV, toGzip, wg) + headers := <-getHeaders + + go c.preProcessRows(toPreprocess, headers, toCSV, wg) go c.csvToGzip(toGzip, w, wg) // Buffers for each iteration - values := make([]interface{}, len(columnNames), len(columnNames)) - valuePtrs := make([]interface{}, len(columnNames), len(columnNames)) + values := make([]interface{}, len(headers), len(headers)) + valuePtrs := make([]interface{}, len(headers), len(headers)) - for i := range columnNames { + for i := range headers { valuePtrs[i] = &values[i] } From 05f05c1980db56bb6cfd705cb15f549239e07488 Mon Sep 17 00:00:00 2001 From: Ashish Date: Sat, 15 May 2021 17:58:58 -0700 Subject: [PATCH 19/19] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ad6455a..e04d99b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# [sqltocsvgzip](https://pkg.go.dev/github.com/thatInfrastructureGuy/sqltocsvgzip) [![Build Status](https://travis-ci.com/thatInfrastructureGuy/sqltocsvgzip.svg?branch=master)](https://travis-ci.com/github/thatInfrastructureGuy/sqltocsvgzip) +# [sqltocsvgzip](https://pkg.go.dev/github.com/thatInfrastructureGuy/sqltocsvgzip) ![Build Status](https://github.com/thatInfrastructureGuy/sqltocsvgzip/actions/workflows/go.yml/badge.svg?branch=master) A library designed to convert sql.Rows result from a query into a **CSV.GZIP** file and/or **upload to AWS S3**.