Skip to content

Commit

Permalink
fix the mqtt contentType issue
Browse files Browse the repository at this point in the history
Signed-off-by: myan <[email protected]>
  • Loading branch information
yanmxa committed May 23, 2024
1 parent cbba3fd commit 2cc07aa
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 14 deletions.
7 changes: 2 additions & 5 deletions protocol/mqtt_paho/v2/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import (
)

const (
prefix = "ce-"
contentType = "Content-Type"
prefix = "ce-"
)

var specs = spec.WithPrefix(prefix)
Expand All @@ -42,7 +41,7 @@ func NewMessage(msg *paho.Publish) *Message {
var v spec.Version
if msg.Properties != nil {
// Use properties.User["Content-type"] to determine if message is structured
if s := msg.Properties.User.Get(contentType); format.IsFormat(s) {
if s := msg.Properties.ContentType; format.IsFormat(s) {
f = format.Lookup(s)
} else if s := msg.Properties.User.Get(specs.PrefixedSpecVersionName()); s != "" {
v = specs.Version(s)
Expand Down Expand Up @@ -88,8 +87,6 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter)
} else {
err = encoder.SetExtension(strings.TrimPrefix(userProperty.Key, prefix), userProperty.Value)
}
} else if userProperty.Key == contentType {
err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(userProperty.Value))
}
if err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion protocol/mqtt_paho/v2/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestReadStructured(t *testing.T) {
msg: &paho.Publish{
Payload: []byte(""),
Properties: &paho.PublishProperties{
User: []paho.UserProperty{{Key: contentType, Value: event.ApplicationCloudEventsJSON}},
ContentType: event.ApplicationCloudEventsJSON,
},
},
},
Expand Down
12 changes: 4 additions & 8 deletions protocol/mqtt_paho/v2/write_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ var (

func (b *pubMessageWriter) SetStructuredEvent(ctx context.Context, f format.Format, event io.Reader) error {
if b.Properties == nil {
b.Properties = &paho.PublishProperties{
User: make([]paho.UserProperty, 0),
}
b.Properties = &paho.PublishProperties{}
}
b.Properties.User.Add(contentType, f.MediaType())
b.Properties.ContentType = f.MediaType()
var buf bytes.Buffer
_, err := io.Copy(&buf, event)
if err != nil {
Expand Down Expand Up @@ -85,15 +83,13 @@ func (b *pubMessageWriter) SetData(reader io.Reader) error {
func (b *pubMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error {
if attribute.Kind() == spec.DataContentType {
if value == nil {
b.removeProperty(contentType)
b.Properties.ContentType = ""
}
s, err := types.Format(value)
if err != nil {
return err
}
if err := b.addProperty(contentType, s); err != nil {
return err
}
b.Properties.ContentType = s
} else {
if value == nil {
b.removeProperty(prefix + attribute.Name())
Expand Down
2 changes: 2 additions & 0 deletions samples/mqtt/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func main() {

for i := 0; i < count; i++ {
e := cloudevents.NewEvent()
e.SetExtension("eventkey", "eventvalue")
e.SetID(uuid.New().String())
e.SetType("com.cloudevents.sample.sent")
e.SetSource("https://github.com/cloudevents/sdk-go/samples/mqtt/sender")
Expand All @@ -62,6 +63,7 @@ func main() {
if err != nil {
log.Printf("failed to set data: %v", err)
}
ctx = cloudevents.WithEncodingStructured(ctx)
if result := c.Send(
cecontext.WithTopic(ctx, "test-topic"),
e,
Expand Down

0 comments on commit 2cc07aa

Please sign in to comment.