diff --git a/csv.go b/csv.go index 79dbf80..c9d54c8 100644 --- a/csv.go +++ b/csv.go @@ -54,7 +54,7 @@ func (c *Converter) setCSVHeaders(csvWriter *csv.Writer) ([]string, error) { return headers, nil } -func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf, wg *sync.WaitGroup) { +func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan csvBuf, wg *sync.WaitGroup) { defer wg.Done() csvWriter, csvBuffer := c.getCSVWriter() // Set headers @@ -81,7 +81,7 @@ func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf, wg *sync. // Convert from csv to gzip if csvBuffer.Len() >= (c.GzipBatchPerGoroutine * c.GzipGoroutines) { - toGzip <- &csvBuf{ + toGzip <- csvBuf{ data: csvBuffer.Bytes(), lastPart: false, } @@ -92,7 +92,7 @@ func (c *Converter) rowToCSV(toCSV chan []string, toGzip chan *csvBuf, wg *sync. } // Flush remaining buffer contents to gzip - toGzip <- &csvBuf{ + toGzip <- csvBuf{ data: csvBuffer.Bytes(), lastPart: true, } diff --git a/gzip.go b/gzip.go index 961456b..fc3bec4 100644 --- a/gzip.go +++ b/gzip.go @@ -19,7 +19,7 @@ func (c *Converter) getGzipWriter(writer io.Writer) (*pgzip.Writer, error) { return zw, err } -func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer, wg *sync.WaitGroup) { +func (c *Converter) csvToGzip(toGzip chan csvBuf, w io.Writer, wg *sync.WaitGroup) { defer wg.Done() var gzipBuffer *bytes.Buffer if c.S3Upload { @@ -46,16 +46,16 @@ func (c *Converter) csvToGzip(toGzip chan *csvBuf, w io.Writer, wg *sync.WaitGro return } + err = zw.Flush() + if err != nil { + c.quit <- fmt.Errorf("Error flushing contents to gzip writer: ", err) + return + } + if csvBuf.lastPart { err = zw.Close() if err != nil { - c.quit <- fmt.Errorf("Error flushing contents to gzip writer: ", err) - return - } - } else { - err = zw.Flush() - if err != nil { - c.quit <- fmt.Errorf("Error flushing contents to gzip writer: ", err) + c.quit <- fmt.Errorf("Error closing gzip writer: ", err) return } } diff --git a/sqltocsvgzip.go b/sqltocsvgzip.go index 5f6a177..767fc7e 100644 --- a/sqltocsvgzip.go +++ b/sqltocsvgzip.go @@ -141,7 +141,7 @@ func (c *Converter) Write(w io.Writer) (err error) { toPreprocess := make(chan []interface{}) toCSV := make(chan []string) - toGzip := make(chan *csvBuf) + toGzip := make(chan csvBuf) // Create 3 goroutines wg := &sync.WaitGroup{}