Skip to content

Commit

Permalink
feat: make all fields in records nullable
Browse files Browse the repository at this point in the history
Avro handles schema evolution slightly different than protobuf. By
default fields are REQUIRED in Avro unless specifically made NULLABLE.

This commit changes all fields in protobuf messages to be NULLABLE in
their mapped Avro record.
  • Loading branch information
ericwenn committed Aug 10, 2021
1 parent fb0b7c8 commit afb39ac
Show file tree
Hide file tree
Showing 10 changed files with 457 additions and 316 deletions.
37 changes: 19 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,24 @@ Avro schema inference for arbitrary protobuf messages.

```go
func ExampleInferSchema() {
msg := &library.Book{}
schema, err := protoavro.InferSchema(msg.ProtoReflect().Descriptor())
if err != nil {
panic(err)
}
expected := avro.Nullable(avro.Record{
Type: avro.RecordType,
Name: "Book",
Namespace: "google.example.library.v1",
Fields: []avro.Field{
{Name: "name", Type: avro.String()},
{Name: "author", Type: avro.String()},
{Name: "title", Type: avro.String()},
{Name: "read", Type: avro.Boolean()},
},
})
fmt.Println(cmp.Equal(expected, schema))
msg := &library.Book{}
schema, err := protoavro.InferSchema(msg.ProtoReflect().Descriptor())
if err != nil {
panic(err)
}
expected := avro.Nullable(avro.Record{
Type: avro.RecordType,
Name: "Book",
Namespace: "google.example.library.v1",
Fields: []avro.Field{
{Name: "name", Type: avro.Nullable(avro.String())},
{Name: "author", Type: avro.Nullable(avro.String())},
{Name: "title", Type: avro.Nullable(avro.String())},
{Name: "read", Type: avro.Nullable(avro.Boolean())},
},
})
fmt.Println(cmp.Equal(expected, schema))
// Output: true
}
```

Expand Down Expand Up @@ -99,7 +100,7 @@ func ExampleUnmarshaler() {

### Mapping

**Messages** are mapped as nullable records in Avro. Fields will have the same casing as in the protobuf descriptor.
**Messages** are mapped as nullable records in Avro. All fields will be nullable. Fields will have the same casing as in the protobuf descriptor.

**One of**s are mapped to nullable fields in Avro, where at most one field will be set at a time.

Expand Down
66 changes: 33 additions & 33 deletions encoding/protoavro/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ func decodeField(data interface{}, val protoreflect.Message, f protoreflect.Fiel
val.Set(f, protoreflect.ValueOfMap(mp))
return nil
case f.IsList():
listData, ok := data.([]interface{})
if !ok {
return fmt.Errorf("expected list, got %T", data)
listData, err := decodeListLike(data, "array")
if err != nil {
return err
}
list := val.NewField(f).List()
for _, el := range listData {
Expand Down Expand Up @@ -85,57 +85,57 @@ func decodeField(data interface{}, val protoreflect.Message, f protoreflect.Fiel

func decodeFieldKind(data interface{}, mutable protoreflect.Value, f protoreflect.FieldDescriptor) (protoreflect.Value, error) {
switch f.Kind() {
case protoreflect.MessageKind, protoreflect.GroupKind:
if err := decodeMessage(data, mutable.Message()); err != nil {
return protoreflect.Value{}, err
}
return mutable, nil
case protoreflect.StringKind:
str, ok := data.(string)
if !ok {
return protoreflect.Value{}, fmt.Errorf("field %s: expected string, got %T", f.Name(), data)
str, err := decodeStringLike(data, "string")
if err != nil {
return protoreflect.Value{}, fmt.Errorf("field %s: %w", f.Name(), err)
}
return protoreflect.ValueOfString(str), nil
case protoreflect.BoolKind:
bo, ok := data.(bool)
if !ok {
return protoreflect.Value{}, fmt.Errorf("field %s: expected bool, got %T", f.Name(), data)
bo, err := decodeBoolLike(data, "boolean")
if err != nil {
return protoreflect.Value{}, fmt.Errorf("field %s: %w", f.Name(), err)
}
return protoreflect.ValueOfBool(bo), nil
case protoreflect.MessageKind, protoreflect.GroupKind:
if err := decodeMessage(data, mutable.Message()); err != nil {
return protoreflect.Value{}, err
}
return mutable, nil
case protoreflect.Int32Kind, protoreflect.Sfixed32Kind, protoreflect.Sint32Kind:
i, ok := data.(int32)
if !ok {
return protoreflect.Value{}, fmt.Errorf("field %s: expected int32, got %T", f.Name(), data)
i, err := decodeIntLike(data, "int")
if err != nil {
return protoreflect.Value{}, fmt.Errorf("field %s: %w", f.Name(), err)
}
return protoreflect.ValueOfInt32(i), nil
return protoreflect.ValueOfInt32(int32(i)), nil
case protoreflect.Int64Kind, protoreflect.Sfixed64Kind, protoreflect.Sint64Kind:
i, ok := data.(int64)
if !ok {
return protoreflect.Value{}, fmt.Errorf("field %s: expected int64, got %T", f.Name(), data)
i, err := decodeIntLike(data, "long")
if err != nil {
return protoreflect.Value{}, fmt.Errorf("field %s: %w", f.Name(), err)
}
return protoreflect.ValueOfInt64(i), nil
case protoreflect.Uint32Kind, protoreflect.Fixed32Kind:
i, ok := data.(int32)
if !ok {
return protoreflect.Value{}, fmt.Errorf("field %s: expected int32, got %T", f.Name(), data)
i, err := decodeIntLike(data, "int")
if err != nil {
return protoreflect.Value{}, fmt.Errorf("field %s: %w", f.Name(), err)
}
return protoreflect.ValueOfUint32(uint32(i)), nil
case protoreflect.Uint64Kind, protoreflect.Fixed64Kind:
i, ok := data.(int64)
if !ok {
return protoreflect.Value{}, fmt.Errorf("field %s: expected int64, got %T", f.Name(), data)
i, err := decodeIntLike(data, "long")
if err != nil {
return protoreflect.Value{}, fmt.Errorf("field %s: %w", f.Name(), err)
}
return protoreflect.ValueOfUint64(uint64(i)), nil
case protoreflect.BytesKind:
bs, ok := data.([]byte)
if !ok {
return protoreflect.Value{}, fmt.Errorf("field %s: expected []byte, got %T", f.Name(), data)
bs, err := decodeBytesLike(data, "bytes")
if err != nil {
return protoreflect.Value{}, fmt.Errorf("field %s: %w", f.Name(), err)
}
return protoreflect.ValueOfBytes(bs), nil
case protoreflect.EnumKind:
str, ok := data.(string)
if !ok {
return protoreflect.Value{}, fmt.Errorf("field %s: expected string, got %T", f.Name(), data)
str, err := decodeStringLike(data, string(f.Enum().FullName()))
if err != nil {
return protoreflect.Value{}, fmt.Errorf("field %s: %w", f.Name(), err)
}
if v := f.Enum().Values().ByName(protoreflect.Name(str)); v != nil {
return protoreflect.ValueOfEnum(v.Number()), nil
Expand Down
25 changes: 14 additions & 11 deletions encoding/protoavro/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func fieldJSON(field protoreflect.FieldDescriptor, value protoreflect.Value) (in
}
list = append(list, fieldValue)
}
return list, nil
return unionValue("array", list), nil
}
if field.IsMap() {
return encodeMap(field, value.Map())
Expand All @@ -82,31 +82,34 @@ func fieldKindJSON(field protoreflect.FieldDescriptor, value protoreflect.Value)
case protoreflect.MessageKind, protoreflect.GroupKind:
return messageJSON(value.Message())
case protoreflect.EnumKind:
return string(field.Enum().Values().Get(int(value.Enum())).Name()), nil
return unionValue(
string(field.Enum().FullName()),
string(field.Enum().Values().Get(int(value.Enum())).Name()),
), nil
case protoreflect.StringKind:
return value.String(), nil
return unionValue("string", value.String()), nil
case protoreflect.Int32Kind,
protoreflect.Fixed32Kind,
protoreflect.Sfixed32Kind,
protoreflect.Sint32Kind:
return int32(value.Int()), nil
return unionValue("int", int32(value.Int())), nil
case protoreflect.Uint32Kind:
return int32(value.Uint()), nil
return unionValue("int", int32(value.Uint())), nil
case protoreflect.Int64Kind,
protoreflect.Fixed64Kind,
protoreflect.Sfixed64Kind,
protoreflect.Sint64Kind:
return value.Int(), nil
return unionValue("long", value.Int()), nil
case protoreflect.Uint64Kind:
return int64(value.Uint()), nil
return unionValue("long", int64(value.Uint())), nil
case protoreflect.BoolKind:
return value.Bool(), nil
return unionValue("boolean", value.Bool()), nil
case protoreflect.BytesKind:
return value.Bytes(), nil
return unionValue("bytes", value.Bytes()), nil
case protoreflect.DoubleKind:
return value.Float(), nil
return unionValue("double", value.Float()), nil
case protoreflect.FloatKind:
return float32(value.Float()), nil
return unionValue("float", float32(value.Float())), nil
}
return value.Interface(), nil
}
Loading

0 comments on commit afb39ac

Please sign in to comment.