Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/refactor #16

Draft
wants to merge 19 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# [sqltocsvgzip](https://pkg.go.dev/github.com/thatInfrastructureGuy/sqltocsvgzip) [![Build Status](https://travis-ci.com/thatInfrastructureGuy/sqltocsvgzip.svg?branch=master)](https://travis-ci.com/github/thatInfrastructureGuy/sqltocsvgzip)
# [sqltocsvgzip](https://pkg.go.dev/github.com/thatInfrastructureGuy/sqltocsvgzip) ![Build Status](https://github.com/thatInfrastructureGuy/sqltocsvgzip/actions/workflows/go.yml/badge.svg?branch=master)

A library designed to convert sql.Rows result from a query into a **CSV.GZIP** file and/or **upload to AWS S3**.

Expand Down
25 changes: 22 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type Converter struct {
gzipBuf []byte
partNumber int64
uploadQ chan *obj
quit chan bool
quit chan error
}

// CsvPreprocessorFunc is a function type for preprocessing your CSV.
Expand All @@ -74,33 +74,52 @@ func (c *Converter) SetRowPreProcessor(processor CsvPreProcessorFunc) {
c.rowPreProcessor = processor
}

func getLogLevel() (level LogLevel) {
levels := map[string]LogLevel{
"ERROR": Error,
"WARN": Warn,
"INFO": Info,
"DEBUG": Debug,
"VERBOSE": Verbose,
}

var ok bool
if level, ok = levels[os.Getenv("LOG_LEVEL")]; !ok {
level = Info
}

return
}

// WriteConfig will return a Converter which will write your CSV however you like
// but will allow you to set a bunch of non-default behaivour like overriding
// headers or injecting a pre-processing step into your conversion
func WriteConfig(rows *sql.Rows) *Converter {
return &Converter{
rows: rows,
quit: make(chan error, 1),
WriteHeaders: true,
Delimiter: ',',
CsvBufferSize: 10 * 1024 * 1024,
CompressionLevel: flate.DefaultCompression,
GzipGoroutines: runtime.GOMAXPROCS(0), // Should be atleast the number of cores. Not sure how it impacts cgroup limits.
GzipBatchPerGoroutine: 512 * 1024, // Should be atleast 100K
LogLevel: Info,
LogLevel: getLogLevel(),
}
}

// UploadConfig sets the default values for Converter struct.
func UploadConfig(rows *sql.Rows) *Converter {
return &Converter{
rows: rows,
quit: make(chan error, 1),
WriteHeaders: true,
Delimiter: ',',
CompressionLevel: flate.DefaultCompression,
CsvBufferSize: 10 * 1024 * 1024,
GzipGoroutines: runtime.GOMAXPROCS(0), // Should be atleast the number of cores. Not sure how it impacts cgroup limits.
GzipBatchPerGoroutine: 512 * 1024, // Should be atleast 100K
LogLevel: Info,
LogLevel: getLogLevel(),
S3Upload: true,
UploadThreads: 4,
UploadPartSize: 50 * 1024 * 1025, // Should be greater than 5 * 1024 * 1024 for s3 upload
Expand Down
120 changes: 61 additions & 59 deletions csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"bytes"
"encoding/csv"
"fmt"
"strconv"
"time"
"sync"
)

type csvBuf struct {
data []byte
lastPart bool
}

func (c *Converter) getCSVWriter() (*csv.Writer, *bytes.Buffer) {
// Same size as sqlRowBatch
csvBuffer := bytes.NewBuffer(make([]byte, 0, c.CsvBufferSize))
Expand All @@ -23,80 +27,78 @@ func (c *Converter) getCSVWriter() (*csv.Writer, *bytes.Buffer) {
return csvWriter, csvBuffer
}

func (c *Converter) setCSVHeaders(csvWriter *csv.Writer) ([]string, int, error) {
var headers []string
func (c *Converter) getCSVHeaders(csvWriter *csv.Writer) (headers []string, err error) {
columnNames, err := c.rows.Columns()
if err != nil {
return nil, 0, err
return nil, err
}

if c.WriteHeaders {
// use Headers if set, otherwise default to
// query Columns
if len(c.Headers) > 0 {
headers = c.Headers
} else {
headers = columnNames
}
// use Headers if set, otherwise default to
// query Columns
if len(c.Headers) > 0 {
headers = c.Headers
} else {
headers = columnNames
}

// Write to CSV Buffer
err = csvWriter.Write(headers)
if err != nil {
return nil, 0, err
if c.WriteHeaders {
err = csvWriter.Write(headers)
if err != nil {
return nil, err
}
csvWriter.Flush()
}
csvWriter.Flush()

return headers, len(headers), nil
return
}

func (c *Converter) stringify(values []interface{}) []string {
row := make([]string, len(values), len(values))
func (c *Converter) rowToCSV(getHeaders, toCSV chan []string, toGzip chan csvBuf, wg *sync.WaitGroup) {
defer wg.Done()
csvWriter, csvBuffer := c.getCSVWriter()

for i, rawValue := range values {
if rawValue == nil {
row[i] = ""
continue
}
// Get headers
columnNames, err := c.getCSVHeaders(csvWriter)
if err != nil {
close(toGzip)
c.quit <- fmt.Errorf("Error setting CSV Headers: %v", err)
return
}

getHeaders <- columnNames

for row := range toCSV {
c.RowCount = c.RowCount + 1

byteArray, ok := rawValue.([]byte)
if ok {
rawValue = string(byteArray)
// Write to CSV Buffer
err = csvWriter.Write(row)
if err != nil {
close(toGzip)
c.quit <- fmt.Errorf("Error writing to csv buffer: %v", err)
return
}
csvWriter.Flush()

switch castValue := rawValue.(type) {
case time.Time:
if c.TimeFormat != "" {
row[i] = castValue.Format(c.TimeFormat)
// Convert from csv to gzip
if csvBuffer.Len() >= (c.GzipBatchPerGoroutine * c.GzipGoroutines) {
toGzip <- csvBuf{
data: csvBuffer.Bytes(),
lastPart: false,
}
case bool:
row[i] = strconv.FormatBool(castValue)
case string:
row[i] = castValue
case int:
row[i] = strconv.FormatInt(int64(castValue), 10)
case int8:
row[i] = strconv.FormatInt(int64(castValue), 10)
case int16:
row[i] = strconv.FormatInt(int64(castValue), 10)
case int32:
row[i] = strconv.FormatInt(int64(castValue), 10)
case int64:
row[i] = strconv.FormatInt(int64(castValue), 10)
case uint:
row[i] = strconv.FormatUint(uint64(castValue), 10)
case uint8:
row[i] = strconv.FormatUint(uint64(castValue), 10)
case uint16:
row[i] = strconv.FormatUint(uint64(castValue), 10)
case uint32:
row[i] = strconv.FormatUint(uint64(castValue), 10)
case uint64:
row[i] = strconv.FormatUint(uint64(castValue), 10)
default:
row[i] = fmt.Sprintf("%v", castValue)

// Reset buffer
csvBuffer.Reset()
}
}

return row
// Flush remaining buffer contents to gzip
toGzip <- csvBuf{
data: csvBuffer.Bytes(),
lastPart: true,
}

// Reset buffer
csvBuffer.Reset()

close(toGzip)
}
17 changes: 0 additions & 17 deletions getter.go

This file was deleted.

85 changes: 85 additions & 0 deletions gzip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package sqltocsvgzip

import (
"bytes"
"fmt"
"io"
"sync"

"github.com/klauspost/pgzip"
)

func (c *Converter) getGzipWriter(writer io.Writer) (*pgzip.Writer, error) {
// Use pgzip for multi-threaded
zw, err := pgzip.NewWriterLevel(writer, c.CompressionLevel)
if err != nil {
return zw, err
}
err = zw.SetConcurrency(c.GzipBatchPerGoroutine, c.GzipGoroutines)
return zw, err
}

func (c *Converter) csvToGzip(toGzip chan csvBuf, w io.Writer, wg *sync.WaitGroup) {
defer wg.Done()
var gzipBuffer *bytes.Buffer
if c.S3Upload {
var ok bool
gzipBuffer, ok = w.(*bytes.Buffer)
if !ok {
c.quit <- fmt.Errorf("Expected buffer. Got %T", w)
return
}
}

// GZIP writer to underline file.csv.gzip
zw, err := c.getGzipWriter(w)
if err != nil {
c.quit <- fmt.Errorf("Error creating gzip writer: %v", err)
return
}
defer zw.Close()

for csvBuf := range toGzip {
_, err = zw.Write(csvBuf.data)
if err != nil {
c.quit <- fmt.Errorf("Error writing to gzip buffer: %v", err)
return
}

if csvBuf.lastPart {
err = zw.Close()
if err != nil {
c.quit <- fmt.Errorf("Error flushing contents to gzip writer: %v", err)
return
}
} else {
err = zw.Flush()
if err != nil {
c.quit <- fmt.Errorf("Error flushing contents to gzip writer: %v", err)
return
}
}

// Upload partially created file to S3
// If size of the gzip file exceeds maxFileStorage
if c.S3Upload {
if csvBuf.lastPart || gzipBuffer.Len() >= c.UploadPartSize {
if c.partNumber == 10000 {
c.quit <- fmt.Errorf("Number of parts cannot exceed 10000. Please increase UploadPartSize and try again.")
return
}

// Add to Queue
c.AddToQueue(gzipBuffer, csvBuf.lastPart)

//Reset writer
gzipBuffer.Reset()
}
}
}

// Close channel after sending complete.
if c.S3Upload {
close(c.uploadQ)
}
}
Loading