diff --git a/README.md b/README.md index ad6455a..e04d99b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# [sqltocsvgzip](https://pkg.go.dev/github.com/thatInfrastructureGuy/sqltocsvgzip) [![Build Status](https://travis-ci.com/thatInfrastructureGuy/sqltocsvgzip.svg?branch=master)](https://travis-ci.com/github/thatInfrastructureGuy/sqltocsvgzip) +# [sqltocsvgzip](https://pkg.go.dev/github.com/thatInfrastructureGuy/sqltocsvgzip) ![Build Status](https://github.com/thatInfrastructureGuy/sqltocsvgzip/actions/workflows/go.yml/badge.svg?branch=master) A library designed to convert sql.Rows result from a query into a **CSV.GZIP** file and/or **upload to AWS S3**. diff --git a/config.go b/config.go index 5b70b3e..e83a61f 100644 --- a/config.go +++ b/config.go @@ -58,7 +58,7 @@ type Converter struct { gzipBuf []byte partNumber int64 uploadQ chan *obj - quit chan bool + quit chan error } // CsvPreprocessorFunc is a function type for preprocessing your CSV. @@ -74,19 +74,37 @@ func (c *Converter) SetRowPreProcessor(processor CsvPreProcessorFunc) { c.rowPreProcessor = processor } +func getLogLevel() (level LogLevel) { + levels := map[string]LogLevel{ + "ERROR": Error, + "WARN": Warn, + "INFO": Info, + "DEBUG": Debug, + "VERBOSE": Verbose, + } + + var ok bool + if level, ok = levels[os.Getenv("LOG_LEVEL")]; !ok { + level = Info + } + + return +} + // WriteConfig will return a Converter which will write your CSV however you like // but will allow you to set a bunch of non-default behaivour like overriding // headers or injecting a pre-processing step into your conversion func WriteConfig(rows *sql.Rows) *Converter { return &Converter{ rows: rows, + quit: make(chan error, 1), WriteHeaders: true, Delimiter: ',', CsvBufferSize: 10 * 1024 * 1024, CompressionLevel: flate.DefaultCompression, GzipGoroutines: runtime.GOMAXPROCS(0), // Should be atleast the number of cores. Not sure how it impacts cgroup limits. GzipBatchPerGoroutine: 512 * 1024, // Should be atleast 100K - LogLevel: Info, + LogLevel: getLogLevel(), } } @@ -94,13 +112,14 @@ func WriteConfig(rows *sql.Rows) *Converter { func UploadConfig(rows *sql.Rows) *Converter { return &Converter{ rows: rows, + quit: make(chan error, 1), WriteHeaders: true, Delimiter: ',', CompressionLevel: flate.DefaultCompression, CsvBufferSize: 10 * 1024 * 1024, GzipGoroutines: runtime.GOMAXPROCS(0), // Should be atleast the number of cores. Not sure how it impacts cgroup limits. GzipBatchPerGoroutine: 512 * 1024, // Should be atleast 100K - LogLevel: Info, + LogLevel: getLogLevel(), S3Upload: true, UploadThreads: 4, UploadPartSize: 50 * 1024 * 1025, // Should be greater than 5 * 1024 * 1024 for s3 upload diff --git a/csv.go b/csv.go index 02fb946..3b0a3d0 100644 --- a/csv.go +++ b/csv.go @@ -4,10 +4,14 @@ import ( "bytes" "encoding/csv" "fmt" - "strconv" - "time" + "sync" ) +type csvBuf struct { + data []byte + lastPart bool +} + func (c *Converter) getCSVWriter() (*csv.Writer, *bytes.Buffer) { // Same size as sqlRowBatch csvBuffer := bytes.NewBuffer(make([]byte, 0, c.CsvBufferSize)) @@ -23,80 +27,78 @@ func (c *Converter) getCSVWriter() (*csv.Writer, *bytes.Buffer) { return csvWriter, csvBuffer } -func (c *Converter) setCSVHeaders(csvWriter *csv.Writer) ([]string, int, error) { - var headers []string +func (c *Converter) getCSVHeaders(csvWriter *csv.Writer) (headers []string, err error) { columnNames, err := c.rows.Columns() if err != nil { - return nil, 0, err + return nil, err } - if c.WriteHeaders { - // use Headers if set, otherwise default to - // query Columns - if len(c.Headers) > 0 { - headers = c.Headers - } else { - headers = columnNames - } + // use Headers if set, otherwise default to + // query Columns + if len(c.Headers) > 0 { + headers = c.Headers + } else { + headers = columnNames } // Write to CSV Buffer - err = csvWriter.Write(headers) - if err != nil { - return nil, 0, err + if c.WriteHeaders { + err = csvWriter.Write(headers) + if err != nil { + return nil, err + } + csvWriter.Flush() } - csvWriter.Flush() - return headers, len(headers), nil + return } -func (c *Converter) stringify(values []interface{}) []string { - row := make([]string, len(values), len(values)) +func (c *Converter) rowToCSV(getHeaders, toCSV chan []string, toGzip chan csvBuf, wg *sync.WaitGroup) { + defer wg.Done() + csvWriter, csvBuffer := c.getCSVWriter() - for i, rawValue := range values { - if rawValue == nil { - row[i] = "" - continue - } + // Get headers + columnNames, err := c.getCSVHeaders(csvWriter) + if err != nil { + close(toGzip) + c.quit <- fmt.Errorf("Error setting CSV Headers: %v", err) + return + } + + getHeaders <- columnNames + + for row := range toCSV { + c.RowCount = c.RowCount + 1 - byteArray, ok := rawValue.([]byte) - if ok { - rawValue = string(byteArray) + // Write to CSV Buffer + err = csvWriter.Write(row) + if err != nil { + close(toGzip) + c.quit <- fmt.Errorf("Error writing to csv buffer: %v", err) + return } + csvWriter.Flush() - switch castValue := rawValue.(type) { - case time.Time: - if c.TimeFormat != "" { - row[i] = castValue.Format(c.TimeFormat) + // Convert from csv to gzip + if csvBuffer.Len() >= (c.GzipBatchPerGoroutine * c.GzipGoroutines) { + toGzip <- csvBuf{ + data: csvBuffer.Bytes(), + lastPart: false, } - case bool: - row[i] = strconv.FormatBool(castValue) - case string: - row[i] = castValue - case int: - row[i] = strconv.FormatInt(int64(castValue), 10) - case int8: - row[i] = strconv.FormatInt(int64(castValue), 10) - case int16: - row[i] = strconv.FormatInt(int64(castValue), 10) - case int32: - row[i] = strconv.FormatInt(int64(castValue), 10) - case int64: - row[i] = strconv.FormatInt(int64(castValue), 10) - case uint: - row[i] = strconv.FormatUint(uint64(castValue), 10) - case uint8: - row[i] = strconv.FormatUint(uint64(castValue), 10) - case uint16: - row[i] = strconv.FormatUint(uint64(castValue), 10) - case uint32: - row[i] = strconv.FormatUint(uint64(castValue), 10) - case uint64: - row[i] = strconv.FormatUint(uint64(castValue), 10) - default: - row[i] = fmt.Sprintf("%v", castValue) + + // Reset buffer + csvBuffer.Reset() } } - return row + // Flush remaining buffer contents to gzip + toGzip <- csvBuf{ + data: csvBuffer.Bytes(), + lastPart: true, + } + + // Reset buffer + csvBuffer.Reset() + + close(toGzip) } diff --git a/getter.go b/getter.go deleted file mode 100644 index 7c9a282..0000000 --- a/getter.go +++ /dev/null @@ -1,17 +0,0 @@ -package sqltocsvgzip - -import ( - "io" - - "github.com/klauspost/pgzip" -) - -func (c *Converter) getGzipWriter(writer io.Writer) (*pgzip.Writer, error) { - // Use pgzip for multi-threaded - zw, err := pgzip.NewWriterLevel(writer, c.CompressionLevel) - if err != nil { - return zw, err - } - err = zw.SetConcurrency(c.GzipBatchPerGoroutine, c.GzipGoroutines) - return zw, err -} diff --git a/gzip.go b/gzip.go new file mode 100644 index 0000000..ae9c51e --- /dev/null +++ b/gzip.go @@ -0,0 +1,85 @@ +package sqltocsvgzip + +import ( + "bytes" + "fmt" + "io" + "sync" + + "github.com/klauspost/pgzip" +) + +func (c *Converter) getGzipWriter(writer io.Writer) (*pgzip.Writer, error) { + // Use pgzip for multi-threaded + zw, err := pgzip.NewWriterLevel(writer, c.CompressionLevel) + if err != nil { + return zw, err + } + err = zw.SetConcurrency(c.GzipBatchPerGoroutine, c.GzipGoroutines) + return zw, err +} + +func (c *Converter) csvToGzip(toGzip chan csvBuf, w io.Writer, wg *sync.WaitGroup) { + defer wg.Done() + var gzipBuffer *bytes.Buffer + if c.S3Upload { + var ok bool + gzipBuffer, ok = w.(*bytes.Buffer) + if !ok { + c.quit <- fmt.Errorf("Expected buffer. Got %T", w) + return + } + } + + // GZIP writer to underline file.csv.gzip + zw, err := c.getGzipWriter(w) + if err != nil { + c.quit <- fmt.Errorf("Error creating gzip writer: %v", err) + return + } + defer zw.Close() + + for csvBuf := range toGzip { + _, err = zw.Write(csvBuf.data) + if err != nil { + c.quit <- fmt.Errorf("Error writing to gzip buffer: %v", err) + return + } + + if csvBuf.lastPart { + err = zw.Close() + if err != nil { + c.quit <- fmt.Errorf("Error flushing contents to gzip writer: %v", err) + return + } + } else { + err = zw.Flush() + if err != nil { + c.quit <- fmt.Errorf("Error flushing contents to gzip writer: %v", err) + return + } + } + + // Upload partially created file to S3 + // If size of the gzip file exceeds maxFileStorage + if c.S3Upload { + if csvBuf.lastPart || gzipBuffer.Len() >= c.UploadPartSize { + if c.partNumber == 10000 { + c.quit <- fmt.Errorf("Number of parts cannot exceed 10000. Please increase UploadPartSize and try again.") + return + } + + // Add to Queue + c.AddToQueue(gzipBuffer, csvBuf.lastPart) + + //Reset writer + gzipBuffer.Reset() + } + } + } + + // Close channel after sending complete. + if c.S3Upload { + close(c.uploadQ) + } +} diff --git a/preprocess.go b/preprocess.go new file mode 100644 index 0000000..285ebcc --- /dev/null +++ b/preprocess.go @@ -0,0 +1,78 @@ +package sqltocsvgzip + +import ( + "fmt" + "strconv" + "sync" + "time" +) + +func (c *Converter) stringify(values []interface{}) []string { + row := make([]string, len(values), len(values)) + + for i, rawValue := range values { + if rawValue == nil { + row[i] = "" + continue + } + + byteArray, ok := rawValue.([]byte) + if ok { + rawValue = string(byteArray) + } + + switch castValue := rawValue.(type) { + case time.Time: + if c.TimeFormat != "" { + row[i] = castValue.Format(c.TimeFormat) + } + case bool: + row[i] = strconv.FormatBool(castValue) + case string: + row[i] = castValue + case int: + row[i] = strconv.FormatInt(int64(castValue), 10) + case int8: + row[i] = strconv.FormatInt(int64(castValue), 10) + case int16: + row[i] = strconv.FormatInt(int64(castValue), 10) + case int32: + row[i] = strconv.FormatInt(int64(castValue), 10) + case int64: + row[i] = strconv.FormatInt(int64(castValue), 10) + case uint: + row[i] = strconv.FormatUint(uint64(castValue), 10) + case uint8: + row[i] = strconv.FormatUint(uint64(castValue), 10) + case uint16: + row[i] = strconv.FormatUint(uint64(castValue), 10) + case uint32: + row[i] = strconv.FormatUint(uint64(castValue), 10) + case uint64: + row[i] = strconv.FormatUint(uint64(castValue), 10) + default: + row[i] = fmt.Sprintf("%v", castValue) + } + } + + return row +} + +func (c *Converter) preProcessRows(toPreprocess chan []interface{}, columnNames []string, toCSV chan []string, wg *sync.WaitGroup) { + defer wg.Done() + writeRow := true + + for values := range toPreprocess { + row := c.stringify(values) + + if c.rowPreProcessor != nil { + writeRow, row = c.rowPreProcessor(row, columnNames) + } + + if writeRow { + toCSV <- row + } + } + + close(toCSV) +} diff --git a/sort.go b/s3_sort_completed_parts.go similarity index 100% rename from sort.go rename to s3_sort_completed_parts.go diff --git a/sqltocsvgzip.go b/sqltocsvgzip.go index 0978f39..af076bc 100644 --- a/sqltocsvgzip.go +++ b/sqltocsvgzip.go @@ -19,7 +19,7 @@ import ( // WriteFile will write a CSV.GZIP file to the file name specified (with headers) // based on whatever is in the sql.Rows you pass in. -func WriteFile(csvGzipFileName string, rows *sql.Rows) error { +func WriteFile(csvGzipFileName string, rows *sql.Rows) (rowCount int64, err error) { return WriteConfig(rows).WriteFile(csvGzipFileName) } @@ -28,7 +28,7 @@ func WriteFile(csvGzipFileName string, rows *sql.Rows) error { // UploadToS3 looks for the following environment variables. // Required: S3_BUCKET, S3_PATH, S3_REGION // Optional: S3_ACL (default => bucket-owner-full-control) -func UploadToS3(rows *sql.Rows) error { +func UploadToS3(rows *sql.Rows) (rowCount int64, err error) { return UploadConfig(rows).Upload() } @@ -36,50 +36,45 @@ func UploadToS3(rows *sql.Rows) error { // Creates a Multipart AWS requests. // Completes the multipart request if all uploads are successful. // Aborts the operation when an error is received. -func (c *Converter) Upload() error { +func (c *Converter) Upload() (rowCount int64, err error) { if c.UploadPartSize < minFileSize { - return fmt.Errorf("UploadPartSize should be greater than %v\n", minFileSize) + return 0, fmt.Errorf("UploadPartSize should be greater than %v\n", minFileSize) } // Create MultiPart S3 Upload - err := c.createS3Session() + err = c.createS3Session() if err != nil { - return err + return 0, err } err = c.createMultipartRequest() if err != nil { - return err + return 0, err } - wg := sync.WaitGroup{} - buf := bytes.Buffer{} c.uploadQ = make(chan *obj, c.UploadThreads) - c.quit = make(chan bool, 1) + wg := &sync.WaitGroup{} // Upload Parts to S3 for i := 0; i < c.UploadThreads; i++ { wg.Add(1) go func() { defer wg.Done() - err = c.UploadPart() - if err != nil { - c.writeLog(Error, err.Error()) - } + c.UploadPart() }() } + buf := bytes.Buffer{} err = c.Write(&buf) if err != nil { // Abort S3 Upload awserr := c.abortMultipartUpload() if awserr != nil { - return awserr + return 0, awserr } - return err + return 0, err } - close(c.uploadQ) wg.Wait() if c.partNumber == 0 { @@ -87,14 +82,14 @@ func (c *Converter) Upload() error { c.writeLog(Info, "Gzip file < 5 MB. Enable direct upload. Abort multipart upload.") err = c.abortMultipartUpload() if err != nil { - return err + return 0, err } err = c.UploadObjectToS3(&buf) if err != nil { - return err + return 0, err } - return nil + return c.RowCount, nil } // Sort completed parts @@ -105,25 +100,25 @@ func (c *Converter) Upload() error { // Abort S3 Upload awserr := c.abortMultipartUpload() if awserr != nil { - return awserr + return 0, awserr } - return err + return 0, err } uploadPath, err := url.PathUnescape(completeResponse.String()) if err != nil { - return err + return 0, err } c.writeLog(Info, "Successfully uploaded file: "+uploadPath) - return nil + return c.RowCount, nil } // WriteFile writes the csv.gzip to the filename specified, return an error if problem -func (c *Converter) WriteFile(csvGzipFileName string) error { +func (c *Converter) WriteFile(csvGzipFileName string) (rowCount int64, err error) { f, err := os.Create(csvGzipFileName) if err != nil { - return err + return 0, err } defer f.Close() @@ -132,50 +127,51 @@ func (c *Converter) WriteFile(csvGzipFileName string) error { err = c.Write(f) if err != nil { - return err + return 0, err } - return nil + return c.RowCount, nil } // Write writes the csv.gzip to the Writer provided -func (c *Converter) Write(w io.Writer) error { - writeRow := true +func (c *Converter) Write(w io.Writer) (err error) { interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) defer signal.Stop(interrupt) - csvWriter, csvBuffer := c.getCSVWriter() + toPreprocess := make(chan []interface{}) + getHeaders, toCSV := make(chan []string), make(chan []string) + toGzip := make(chan csvBuf) - // Set headers - columnNames, totalColumns, err := c.setCSVHeaders(csvWriter) - if err != nil { - return err - } + // Create 3 goroutines + wg := &sync.WaitGroup{} + wg.Add(3) + + go c.rowToCSV(getHeaders, toCSV, toGzip, wg) + headers := <-getHeaders + + go c.preProcessRows(toPreprocess, headers, toCSV, wg) + go c.csvToGzip(toGzip, w, wg) // Buffers for each iteration - values := make([]interface{}, totalColumns, totalColumns) - valuePtrs := make([]interface{}, totalColumns, totalColumns) + values := make([]interface{}, len(headers), len(headers)) + valuePtrs := make([]interface{}, len(headers), len(headers)) - for i := range columnNames { + for i := range headers { valuePtrs[i] = &values[i] } - zw, err := c.getGzipWriter(w) - if err != nil { - return err - } - defer zw.Close() - // Iterate over sql rows for c.rows.Next() { select { - case <-c.quit: + case err := <-c.quit: + close(toPreprocess) c.abortMultipartUpload() - return fmt.Errorf("Received quit signal. Exiting.") + return fmt.Errorf("Error received: %v\n", err) case <-interrupt: + close(toPreprocess) c.abortMultipartUpload() - return fmt.Errorf("Received quit signal. Exiting.") + return fmt.Errorf("Received os interrupt signal. Exiting.") default: // Do nothing } @@ -184,60 +180,7 @@ func (c *Converter) Write(w io.Writer) error { return err } - row := c.stringify(values) - - if c.rowPreProcessor != nil { - writeRow, row = c.rowPreProcessor(row, columnNames) - } - - if writeRow { - c.RowCount = c.RowCount + 1 - - // Write to CSV Buffer - err = csvWriter.Write(row) - if err != nil { - return err - } - csvWriter.Flush() - - // Convert from csv to gzip - // Writes from buffer to underlying file - if csvBuffer.Len() >= (c.GzipBatchPerGoroutine * c.GzipGoroutines) { - _, err = zw.Write(csvBuffer.Bytes()) - if err != nil { - return err - } - err = zw.Flush() - if err != nil { - return err - } - - // Reset buffer - csvBuffer.Reset() - - // Upload partially created file to S3 - // If size of the gzip file exceeds maxFileStorage - if c.S3Upload { - // GZIP writer to underline file.csv.gzip - gzipBuffer, ok := w.(*bytes.Buffer) - if !ok { - return fmt.Errorf("Expected buffer. Got %T", w) - } - - if gzipBuffer.Len() >= c.UploadPartSize { - if c.partNumber == 10000 { - return fmt.Errorf("Number of parts cannot exceed 10000. Please increase UploadPartSize and try again.") - } - - // Add to Queue - c.AddToQueue(gzipBuffer, false) - - //Reset writer - gzipBuffer.Reset() - } - } - } - } + toPreprocess <- values } err = c.rows.Err() @@ -245,36 +188,9 @@ func (c *Converter) Write(w io.Writer) error { return err } - _, err = zw.Write(csvBuffer.Bytes()) - if err != nil { - return err - } - err = zw.Close() - if err != nil { - return err - } - - //Wipe the buffer - csvBuffer.Reset() - - // Upload last part of the file to S3 - if c.S3Upload { - if c.partNumber == 0 { - return nil - } - - // GZIP writer to underline file.csv.gzip - gzipBuffer, ok := w.(*bytes.Buffer) - if !ok { - return fmt.Errorf("Expected buffer. Got %T", w) - } + close(toPreprocess) - // Add to Queue for multipart upload - c.AddToQueue(gzipBuffer, true) - - //Reset writer - gzipBuffer.Reset() - } + wg.Wait() // Log the total number of rows processed. c.writeLog(Info, fmt.Sprintf("Total sql rows processed: %v", c.RowCount)) @@ -328,19 +244,16 @@ func (c *Converter) AddToQueue(buf *bytes.Buffer, lastPart bool) { // UploadPart listens to upload queue. Whenever an obj is received, // it is then uploaded to AWS. // Abort operation is called if any error is received. -func (c *Converter) UploadPart() (err error) { +func (c *Converter) UploadPart() { mu := &sync.RWMutex{} for s3obj := range c.uploadQ { - err = c.uploadPart(s3obj.partNumber, s3obj.buf, mu) + err := c.uploadPart(s3obj.partNumber, s3obj.buf, mu) if err != nil { - c.writeLog(Error, "Error occurred. Sending quit signal to writer.") - c.quit <- true - c.abortMultipartUpload() - return err + c.quit <- fmt.Errorf("Error uploading part: %v\n", err) + return } } c.writeLog(Debug, "Received closed signal") - return } // writeLog decides whether to write a log to stdout depending on LogLevel.