Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
add avro reading [nt]
Browse files Browse the repository at this point in the history
  • Loading branch information
flarco committed Dec 1, 2023
1 parent b2b676b commit 0109100
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 4 deletions.
7 changes: 6 additions & 1 deletion filesys/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ const FileTypeCsv FileType = "csv"
const FileTypeXml FileType = "xml"
const FileTypeJson FileType = "json"
const FileTypeParquet FileType = "parquet"
const FileTypeAvro FileType = "avro"
const FileTypeJsonLines FileType = "jsonlines"

func (ft FileType) Ext() string {
Expand Down Expand Up @@ -454,6 +455,8 @@ func (fs *BaseFileSysClient) GetDatastream(urlStr string) (ds *iop.Datastream, e
err = ds.ConsumeXmlReader(reader)
case FileTypeParquet:
err = ds.ConsumeParquetReader(reader)
case FileTypeAvro:
err = ds.ConsumeAvroReader(reader)
default:
err = ds.ConsumeCsvReader(reader)
}
Expand Down Expand Up @@ -1156,6 +1159,8 @@ func MergeReaders(fs FileSysClient, fileType FileType, paths ...string) (ds *iop
err = ds.ConsumeXmlReader(pipeR)
case FileTypeParquet:
err = ds.ConsumeParquetReader(pipeR)
case FileTypeAvro:
err = ds.ConsumeAvroReader(pipeR)
case FileTypeCsv:
err = ds.ConsumeCsvReader(pipeR)
default:
Expand Down Expand Up @@ -1214,7 +1219,7 @@ func ProcessStreamViaTempFile(ds *iop.Datastream) (nDs *iop.Datastream, err erro
func InferFileFormat(path string) FileType {
path = strings.TrimSpace(strings.ToLower(path))

for _, fileType := range []FileType{FileTypeJsonLines, FileTypeJson, FileTypeXml, FileTypeParquet} {
for _, fileType := range []FileType{FileTypeJsonLines, FileTypeJson, FileTypeXml, FileTypeParquet, FileTypeAvro} {
ext := fileType.Ext()
if strings.HasSuffix(path, ext) || strings.Contains(path, ext+".") {
return fileType
Expand Down
2 changes: 2 additions & 0 deletions filesys/fs_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func (fs *LocalFileSysClient) GetDatastream(path string) (ds *iop.Datastream, er
err = ds.ConsumeXmlReader(bufio.NewReader(file))
case FileTypeParquet:
err = ds.ConsumeParquetReaderSeeker(file)
case FileTypeAvro:
err = ds.ConsumeAvroReaderSeeker(file)
case FileTypeCsv:
err = ds.ConsumeCsvReader(bufio.NewReader(file))
default:
Expand Down
73 changes: 72 additions & 1 deletion filesys/fs_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package filesys

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"strings"
Expand All @@ -9,6 +12,7 @@ import (

"github.com/flarco/dbio"
"github.com/flarco/g/net"
"github.com/linkedin/goavro/v2"
"github.com/spf13/cast"

"github.com/flarco/dbio/iop"
Expand Down Expand Up @@ -240,6 +244,72 @@ func TestFileSysLocalParquet(t *testing.T) {

}

func TestFileSysLocalAvro(t *testing.T) {
t.Parallel()

fs, err := NewFileSysClient(dbio.TypeFileLocal)
assert.NoError(t, err)

df1, err := fs.ReadDataflow("test/test1/avro")
assert.NoError(t, err)

data1, err := df1.Collect()
assert.NoError(t, err)
assert.EqualValues(t, 2, len(data1.Rows))
assert.EqualValues(t, 3, len(data1.Columns))

avroSchema := `
{
"type": "record",
"name": "test_schema",
"fields": [
{
"name": "time",
"type": "long"
},
{
"name": "customer",
"type": "string"
}
]
}`

// Writing OCF data
var ocfFileContents bytes.Buffer
writer, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: &ocfFileContents,
Schema: avroSchema,
})
if err != nil {
fmt.Println(err)
}
err = writer.Append([]map[string]interface{}{
{
"time": 1617104831727,
"customer": "customer1",
},
{
"time": 1717104831727,
"customer": "customer2",
},
})
assert.NoError(t, err)
// fmt.Println("ocfFileContents", ocfFileContents.String())

reader := strings.NewReader(ocfFileContents.String())

ds := iop.NewDatastream(nil)
err = ds.ConsumeAvroReader(reader)
assert.NoError(t, err)

data, err := ds.Collect(0)
assert.NoError(t, err)

assert.EqualValues(t, 2, len(data.Rows))
assert.EqualValues(t, 2, len(data.Columns))

}

func TestFileSysDOSpaces(t *testing.T) {
fs, err := NewFileSysClient(
dbio.TypeFileS3,
Expand Down Expand Up @@ -286,6 +356,7 @@ func TestFileSysDOSpaces(t *testing.T) {
assert.NotContains(t, paths, testPath)

paths, err = fs.ListRecursive("s3://ocral/test/")
assert.NoError(t, err)

// Test datastream
df, err := fs.ReadDataflow("s3://ocral/test/")
Expand Down Expand Up @@ -395,7 +466,7 @@ func TestFileSysS3(t *testing.T) {
return
}

testBytes, err := ioutil.ReadAll(reader2)
testBytes, err := io.ReadAll(reader2)
assert.NoError(t, err)

assert.Equal(t, testString, string(testBytes))
Expand Down
Binary file added filesys/test/test1/avro/twitter.avro
Binary file not shown.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ require (
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.3 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/linkedin/goavro/v2 v2.12.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-ieproxy v0.0.9 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,8 @@ github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8=
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/linkedin/goavro/v2 v2.12.0 h1:rIQQSj8jdAUlKQh6DttK8wCRv4t4QO09g1C4aBWXslg=
github.com/linkedin/goavro/v2 v2.12.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
Expand Down
119 changes: 119 additions & 0 deletions iop/avro.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package iop

import (
"io"
"strings"

"github.com/flarco/g"
"github.com/linkedin/goavro/v2"
"github.com/samber/lo"
)

// Avro is a avro` object
type Avro struct {
Path string
Reader *goavro.OCFReader
Data *Dataset
colMap map[string]int
codec *goavro.Codec
}

func NewAvroStream(reader io.ReadSeeker, columns Columns) (a *Avro, err error) {
ar, err := goavro.NewOCFReader(reader)
if err != nil {
err = g.Error(err, "could not read avro reader")
return
}

a = &Avro{Reader: ar, codec: ar.Codec()}
a.colMap = a.Columns().FieldMap(true)

return
}

func (a *Avro) Columns() Columns {

typeMap := map[string]ColumnType{
"string": StringType,
"int": IntegerType,
"long": BigIntType,
"float": DecimalType,
"double": DecimalType,
"bytes": BinaryType,
"null": StringType,
"array": JsonType,
"map": JsonType,
"record": JsonType,
"enum": StringType,
}

type avroField struct {
Name string `json:"name"`
Type any `json:"type"`
}

type avroSchema struct {
Name string `json:"name"`
Fields []avroField `json:"fields"`
}

schema := avroSchema{}

g.Unmarshal(a.codec.Schema(), &schema)

fields := lo.Map(
schema.Fields,
func(f avroField, i int) string { return f.Name },
)

cols := NewColumnsFromFields(fields...)
for i, field := range schema.Fields {
key := g.Marshal(field.Type)
key = strings.TrimPrefix(key, `"`)
key = strings.TrimSuffix(key, `"`)

if typ, ok := typeMap[key]; ok {
cols[i].Type = typ
}
}

return cols
}

func (a *Avro) nextFunc(it *Iterator) bool {
if !a.Reader.Scan() {
return false
}

if err := a.Reader.Err(); err != nil {
it.Context.CaptureErr(g.Error(err, "could not read Avro row"))
return false
}

datum, err := a.Reader.Read()
if err != nil {
it.Context.CaptureErr(g.Error(err, "could not read Avro record"))
return false
}

buf, err := a.codec.TextualFromNative(nil, datum)
if err != nil {
it.Context.CaptureErr(g.Error(err, "could not convert to Avro record"))
return false
}

rec, err := g.JSONUnmarshalToMap(buf)
if err != nil {
it.Context.CaptureErr(g.Error(err, "could not unmarshal Avro record"))
return false
}

it.Row = make([]interface{}, len(it.ds.Columns))
for k, v := range rec {
col := it.ds.Columns[a.colMap[strings.ToLower(k)]]
i := col.Position - 1
it.Row[i] = v
}

return true
}
47 changes: 46 additions & 1 deletion iop/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ func (ds *Datastream) ConsumeCsvReader(reader io.Reader) (err error) {
return
}

// ConsumeParquetReader uses the provided reader to stream rows
// ConsumeParquetReaderSeeker uses the provided reader to stream rows
func (ds *Datastream) ConsumeParquetReaderSeeker(reader io.ReadSeeker) (err error) {
p, err := NewParquetStream(reader, Columns{})
if err != nil {
Expand Down Expand Up @@ -822,6 +822,51 @@ func (ds *Datastream) ConsumeParquetReader(reader io.Reader) (err error) {
return ds.ConsumeParquetReaderSeeker(file)
}

// ConsumeAvroReaderSeeker uses the provided reader to stream rows
func (ds *Datastream) ConsumeAvroReaderSeeker(reader io.ReadSeeker) (err error) {
a, err := NewAvroStream(reader, Columns{})
if err != nil {
return g.Error(err, "could create parquet stream")
}

ds.Columns = a.Columns()
ds.Inferred = true
ds.it = ds.NewIterator(ds.Columns, a.nextFunc)

err = ds.Start()
if err != nil {
return g.Error(err, "could start datastream")
}

return
}

// ConsumeAvroReader uses the provided reader to stream rows
func (ds *Datastream) ConsumeAvroReader(reader io.Reader) (err error) {
// need to write to temp file prior
tempDir := strings.TrimRight(strings.TrimRight(os.TempDir(), "/"), "\\")
avroPath := path.Join(tempDir, g.NewTsID("avro.temp")+".avro")
ds.Defer(func() { os.Remove(avroPath) })

file, err := os.Create(avroPath)
if err != nil {
return g.Error(err, "Unable to create temp file: "+avroPath)
}

bw, err := io.Copy(file, reader)
if err != nil {
return g.Error(err, "Unable to write to temp file: "+avroPath)
}
g.DebugLow("wrote %d bytes to %s", bw, avroPath)

_, err = file.Seek(0, 0) // reset to beginning
if err != nil {
return g.Error(err, "Unable to seek to beginning of temp file: "+avroPath)
}

return ds.ConsumeAvroReaderSeeker(file)
}

// AddBytes add bytes as processed
func (ds *Datastream) AddBytes(b int64) {
ds.Bytes = ds.Bytes + cast.ToUint64(b)
Expand Down
2 changes: 1 addition & 1 deletion iop/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Parquet struct {
func NewParquetStream(reader io.ReadSeeker, columns Columns) (p *Parquet, err error) {
fr, err := goparquet.NewFileReader(reader, columns.Names()...)
if err != nil {
err = g.Error(err, "could not reader parquet reader")
err = g.Error(err, "could not read parquet reader")
return
}
p = &Parquet{Reader: fr}
Expand Down

0 comments on commit 0109100

Please sign in to comment.