Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable row group length for writing #80

Merged
merged 1 commit into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/gpq/command/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ConvertCmd struct {
Max int `help:"Maximum number of features to consider when building a schema." default:"100"`
InputPrimaryColumn string `help:"Primary geometry column name when reading Parquet withtout metadata." default:"geometry"`
Compression string `help:"Parquet compression to use. Possible values: ${enum}." enum:"uncompressed, snappy, gzip, brotli, zstd" default:"zstd"`
RowGroupLength int `help:"Maximum number of rows per group when writing Parquet."`
}

type FormatType string
Expand Down Expand Up @@ -149,7 +150,12 @@ func (c *ConvertCmd) Run() error {
if outputFormat != ParquetType && outputFormat != GeoParquetType {
return errors.New("GeoJSON input can only be converted to GeoParquet")
}
convertOptions := &geojson.ConvertOptions{MinFeatures: c.Min, MaxFeatures: c.Max, Compression: c.Compression}
convertOptions := &geojson.ConvertOptions{
MinFeatures: c.Min,
MaxFeatures: c.Max,
Compression: c.Compression,
RowGroupLength: c.RowGroupLength,
}
return geojson.ToParquet(input, output, convertOptions)
}

Expand All @@ -160,6 +166,7 @@ func (c *ConvertCmd) Run() error {
convertOptions := &geoparquet.ConvertOptions{
InputPrimaryColumn: c.InputPrimaryColumn,
Compression: c.Compression,
RowGroupLength: c.RowGroupLength,
}

return geoparquet.FromParquet(input, output, convertOptions)
Expand Down
18 changes: 13 additions & 5 deletions internal/geojson/geojson.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ func FromParquet(reader parquet.ReaderAtSeeker, writer io.Writer) error {
}

type ConvertOptions struct {
MinFeatures int
MaxFeatures int
Compression string
Metadata string
MinFeatures int
MaxFeatures int
Compression string
RowGroupLength int
Metadata string
}

var defaultOptions = &ConvertOptions{
Expand All @@ -80,12 +81,19 @@ func ToParquet(input io.Reader, output io.Writer, convertOptions *ConvertOptions
featuresRead := 0

var pqWriterProps *parquet.WriterProperties
var writerOptions []parquet.WriterProperty
if convertOptions.Compression != "" {
compression, err := pqutil.GetCompression(convertOptions.Compression)
if err != nil {
return err
}
pqWriterProps = parquet.NewWriterProperties(parquet.WithCompression(compression))
writerOptions = append(writerOptions, parquet.WithCompression(compression))
}
if convertOptions.RowGroupLength > 0 {
writerOptions = append(writerOptions, parquet.WithMaxRowGroupLength(int64(convertOptions.RowGroupLength)))
}
if len(writerOptions) > 0 {
pqWriterProps = parquet.NewWriterProperties(writerOptions...)
}

var featureWriter *geoparquet.FeatureWriter
Expand Down
36 changes: 36 additions & 0 deletions internal/geojson/geojson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,42 @@ func TestToParquet(t *testing.T) {
assert.JSONEq(t, string(expected), geojsonBuffer.String())
}

func TestToParquetRowGroupLength3(t *testing.T) {
geojsonFile, openErr := os.Open("testdata/ten-points.geojson")
require.NoError(t, openErr)

parquetBuffer := &bytes.Buffer{}
toParquetErr := geojson.ToParquet(geojsonFile, parquetBuffer, &geojson.ConvertOptions{
RowGroupLength: 3,
})
assert.NoError(t, toParquetErr)

parquetInput := bytes.NewReader(parquetBuffer.Bytes())
fileReader, fileErr := file.NewParquetReader(parquetInput)
require.NoError(t, fileErr)
defer fileReader.Close()

assert.Equal(t, 4, fileReader.NumRowGroups())
}

func TestToParquetRowGroupLength5(t *testing.T) {
geojsonFile, openErr := os.Open("testdata/ten-points.geojson")
require.NoError(t, openErr)

parquetBuffer := &bytes.Buffer{}
toParquetErr := geojson.ToParquet(geojsonFile, parquetBuffer, &geojson.ConvertOptions{
RowGroupLength: 5,
})
assert.NoError(t, toParquetErr)

parquetInput := bytes.NewReader(parquetBuffer.Bytes())
fileReader, fileErr := file.NewParquetReader(parquetInput)
require.NoError(t, fileErr)
defer fileReader.Close()

assert.Equal(t, 2, fileReader.NumRowGroups())
}

func TestToParquetMismatchedTypes(t *testing.T) {
geojsonFile, openErr := os.Open("testdata/mismatched-types.geojson")
require.NoError(t, openErr)
Expand Down
105 changes: 105 additions & 0 deletions internal/geojson/testdata/ten-points.geojson
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
{
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": {
"num": 0
},
"geometry": {
"type": "Point",
"coordinates": [0, 0]
}
},
{
"type": "Feature",
"properties": {
"num": 1
},
"geometry": {
"type": "Point",
"coordinates": [1, 1]
}
},
{
"type": "Feature",
"properties": {
"num": 2
},
"geometry": {
"type": "Point",
"coordinates": [2, 2]
}
},
{
"type": "Feature",
"properties": {
"num": 3
},
"geometry": {
"type": "Point",
"coordinates": [3, 3]
}
},
{
"type": "Feature",
"properties": {
"num": 4
},
"geometry": {
"type": "Point",
"coordinates": [4, 4]
}
},
{
"type": "Feature",
"properties": {
"num": 5
},
"geometry": {
"type": "Point",
"coordinates": [5, 5]
}
},
{
"type": "Feature",
"properties": {
"num": 6
},
"geometry": {
"type": "Point",
"coordinates": [6, 6]
}
},
{
"type": "Feature",
"properties": {
"num": 7
},
"geometry": {
"type": "Point",
"coordinates": [7, 7]
}
},
{
"type": "Feature",
"properties": {
"num": 8
},
"geometry": {
"type": "Point",
"coordinates": [8, 8]
}
},
{
"type": "Feature",
"properties": {
"num": 9
},
"geometry": {
"type": "Point",
"coordinates": [9, 9]
}
}
]
}
2 changes: 2 additions & 0 deletions internal/geoparquet/geoparquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
type ConvertOptions struct {
InputPrimaryColumn string
Compression string
RowGroupLength int
}

func getMetadata(fileReader *file.Reader, convertOptions *ConvertOptions) *Metadata {
Expand Down Expand Up @@ -171,6 +172,7 @@ func FromParquet(input parquet.ReaderAtSeeker, output io.Writer, convertOptions
TransformColumn: transformColumn,
BeforeClose: beforeClose,
Compression: compression,
RowGroupLength: convertOptions.RowGroupLength,
}

return pqutil.TransformByColumn(config)
Expand Down
98 changes: 77 additions & 21 deletions internal/pqutil/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type TransformConfig struct {
Reader parquet.ReaderAtSeeker
Writer io.Writer
Compression *compress.Compression
RowGroupLength int
TransformSchema SchemaTransformer
TransformColumn ColumnTransformer
BeforeClose func(*file.Reader, *file.Writer) error
Expand Down Expand Up @@ -50,6 +51,10 @@ func getWriterProperties(config *TransformConfig, fileReader *file.Reader) (*par
}
}

if config.RowGroupLength > 0 {
writerProperties = append(writerProperties, parquet.WithMaxRowGroupLength(int64(config.RowGroupLength)))
}

return parquet.NewWriterProperties(writerProperties...), nil
}

Expand Down Expand Up @@ -104,34 +109,85 @@ func TransformByColumn(config *TransformConfig) error {

ctx := pqarrow.NewArrowWriteContext(context.Background(), nil)

numRowGroups := fileReader.NumRowGroups()
for rowGroupIndex := 0; rowGroupIndex < numRowGroups; rowGroupIndex += 1 {
rowGroupReader := arrowReader.RowGroup(rowGroupIndex)
rowGroupWriter := fileWriter.AppendRowGroup()
if config.RowGroupLength > 0 {
columnReaders := make([]*pqarrow.ColumnReader, numFields)
for fieldNum := 0; fieldNum < numFields; fieldNum += 1 {
arr, readErr := rowGroupReader.Column(fieldNum).Read(ctx)
if readErr != nil {
return readErr
colReader, err := arrowReader.GetColumn(ctx, fieldNum)
if err != nil {
return err
}
if config.TransformColumn != nil {
inputField := inputManifest.Fields[fieldNum].Field
outputField := outputManifest.Fields[fieldNum].Field
transformed, err := config.TransformColumn(inputField, outputField, arr)
if err != nil {
return err
columnReaders[fieldNum] = colReader
}

numRows := fileReader.NumRows()
numRowsWritten := int64(0)
for {
rowGroupWriter := fileWriter.AppendRowGroup()
for fieldNum := 0; fieldNum < numFields; fieldNum += 1 {
colReader := columnReaders[fieldNum]
arr, readErr := colReader.NextBatch(int64(config.RowGroupLength))
if readErr != nil {
return readErr
}
if transformed.DataType() != outputField.Type {
return fmt.Errorf("transform generated an unexpected type, got %s, expected %s", transformed.DataType().Name(), outputField.Type.Name())
if config.TransformColumn != nil {
inputField := inputManifest.Fields[fieldNum].Field
outputField := outputManifest.Fields[fieldNum].Field
transformed, err := config.TransformColumn(inputField, outputField, arr)
if err != nil {
return err
}
if transformed.DataType() != outputField.Type {
return fmt.Errorf("transform generated an unexpected type, got %s, expected %s", transformed.DataType().Name(), outputField.Type.Name())
}
arr = transformed
}
colWriter, colWriterErr := pqarrow.NewArrowColumnWriter(arr, 0, int64(arr.Len()), outputManifest, rowGroupWriter, fieldNum)
if colWriterErr != nil {
return colWriterErr
}
if err := colWriter.Write(ctx); err != nil {
return err
}
arr = transformed
}
colWriter, colWriterErr := pqarrow.NewArrowColumnWriter(arr, 0, int64(arr.Len()), outputManifest, rowGroupWriter, fieldNum)
if colWriterErr != nil {
return colWriterErr
}
if err := colWriter.Write(ctx); err != nil {
numRowsInGroup, err := rowGroupWriter.NumRows()
if err != nil {
return err
}
numRowsWritten += int64(numRowsInGroup)
if numRowsWritten >= numRows {
break
}
}
} else {
numRowGroups := fileReader.NumRowGroups()
for rowGroupIndex := 0; rowGroupIndex < numRowGroups; rowGroupIndex += 1 {
rowGroupReader := arrowReader.RowGroup(rowGroupIndex)
rowGroupWriter := fileWriter.AppendRowGroup()
for fieldNum := 0; fieldNum < numFields; fieldNum += 1 {
arr, readErr := rowGroupReader.Column(fieldNum).Read(ctx)
if readErr != nil {
return readErr
}
if config.TransformColumn != nil {
inputField := inputManifest.Fields[fieldNum].Field
outputField := outputManifest.Fields[fieldNum].Field
transformed, err := config.TransformColumn(inputField, outputField, arr)
if err != nil {
return err
}
if transformed.DataType() != outputField.Type {
return fmt.Errorf("transform generated an unexpected type, got %s, expected %s", transformed.DataType().Name(), outputField.Type.Name())
}
arr = transformed
}
colWriter, colWriterErr := pqarrow.NewArrowColumnWriter(arr, 0, int64(arr.Len()), outputManifest, rowGroupWriter, fieldNum)
if colWriterErr != nil {
return colWriterErr
}
if err := colWriter.Write(ctx); err != nil {
return err
}
}
}
}

Expand Down
Loading
Loading