Skip to content

Commit

Permalink
Merge pull request #1063 from yanmxa/br_mqtt_stru
Browse files Browse the repository at this point in the history
Fix the `content-type` issue for the MQTT protocol
  • Loading branch information
embano1 authored Jun 14, 2024
2 parents e8fccd2 + 5754cf9 commit 972dfc7
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
16 changes: 10 additions & 6 deletions protocol/mqtt_paho/v2/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import (
)

const (
prefix = "ce-"
contentType = "Content-Type"
prefix = "ce-"
)

var specs = spec.WithPrefix(prefix)
Expand All @@ -41,8 +40,7 @@ func NewMessage(msg *paho.Publish) *Message {
var f format.Format
var v spec.Version
if msg.Properties != nil {
// Use properties.User["Content-type"] to determine if message is structured
if s := msg.Properties.User.Get(contentType); format.IsFormat(s) {
if s := msg.Properties.ContentType; format.IsFormat(s) {
f = format.Lookup(s)
} else if s := msg.Properties.User.Get(specs.PrefixedSpecVersionName()); s != "" {
v = specs.Version(s)
Expand Down Expand Up @@ -88,14 +86,20 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter)
} else {
err = encoder.SetExtension(strings.TrimPrefix(userProperty.Key, prefix), userProperty.Value)
}
} else if userProperty.Key == contentType {
err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(userProperty.Value))
}
if err != nil {
return
}
}

contentType := m.internal.Properties.ContentType
if contentType != "" {
err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), contentType)
if err != nil {
return err
}
}

if m.internal.Payload != nil {
return encoder.SetData(bytes.NewBuffer(m.internal.Payload))
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/mqtt_paho/v2/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestReadStructured(t *testing.T) {
msg: &paho.Publish{
Payload: []byte(""),
Properties: &paho.PublishProperties{
User: []paho.UserProperty{{Key: contentType, Value: event.ApplicationCloudEventsJSON}},
ContentType: event.ApplicationCloudEventsJSON,
},
},
},
Expand Down
12 changes: 4 additions & 8 deletions protocol/mqtt_paho/v2/write_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ var (

func (b *pubMessageWriter) SetStructuredEvent(ctx context.Context, f format.Format, event io.Reader) error {
if b.Properties == nil {
b.Properties = &paho.PublishProperties{
User: make([]paho.UserProperty, 0),
}
b.Properties = &paho.PublishProperties{}
}
b.Properties.User.Add(contentType, f.MediaType())
b.Properties.ContentType = f.MediaType()
var buf bytes.Buffer
_, err := io.Copy(&buf, event)
if err != nil {
Expand Down Expand Up @@ -85,15 +83,13 @@ func (b *pubMessageWriter) SetData(reader io.Reader) error {
func (b *pubMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error {
if attribute.Kind() == spec.DataContentType {
if value == nil {
b.removeProperty(contentType)
b.Properties.ContentType = ""
}
s, err := types.Format(value)
if err != nil {
return err
}
if err := b.addProperty(contentType, s); err != nil {
return err
}
b.Properties.ContentType = s
} else {
if value == nil {
b.removeProperty(prefix + attribute.Name())
Expand Down

0 comments on commit 972dfc7

Please sign in to comment.