diff --git a/consumer/rabbitmq/consumer.go b/consumer/rabbitmq/consumer.go index 728a9cd..5862e1c 100644 --- a/consumer/rabbitmq/consumer.go +++ b/consumer/rabbitmq/consumer.go @@ -290,7 +290,8 @@ func buildMessage(consumerMeta map[string]interface{}, receivedMsg amqp.Delivery return msg, nil } -func (r *rabbitMQ) requeueMessage(consumerMeta map[string]interface{}, receivedMsg amqp.Delivery) func(ctx context.Context, delayFn goqueue.DelayFn) (err error) { +func (r *rabbitMQ) requeueMessage(consumerMeta map[string]interface{}, + receivedMsg amqp.Delivery) func(ctx context.Context, delayFn goqueue.DelayFn) (err error) { return func(ctx context.Context, delayFn goqueue.DelayFn) (err error) { if delayFn == nil { delayFn = goqueue.DefaultDelayFn diff --git a/publisher/option.go b/publisher/option.go index 64b6a88..b474ae4 100644 --- a/publisher/option.go +++ b/publisher/option.go @@ -1,6 +1,13 @@ package publisher -import "github.com/bxcodec/goqueue" +import ( + "github.com/bxcodec/goqueue" + headerVal "github.com/bxcodec/goqueue/headers/value" +) + +const ( + DefaultContentType = headerVal.ContentTypeJSON +) // Option define the option property type Option struct { diff --git a/publisher/rabbitmq/publisher.go b/publisher/rabbitmq/publisher.go index 4f8cf5a..8ac58c6 100644 --- a/publisher/rabbitmq/publisher.go +++ b/publisher/rabbitmq/publisher.go @@ -41,6 +41,9 @@ func New( } func (r *rabbitMQ) Publish(ctx context.Context, m goqueue.Message) (err error) { + if m.ContentType == "" { + m.ContentType = publisher.DefaultContentType + } publishFunc := middleware.ApplyPublisherMiddleware( r.buildPublisher(), r.option.Middlewares..., @@ -79,7 +82,6 @@ func (r *rabbitMQ) buildPublisher() goqueue.PublisherFunc { m.Headers = headers m.ServiceAgent = headerVal.RabbitMQ - m.ContentType = headerVal.ContentType(m.ContentType) m.Timestamp = timestamp m.ID = id data, err := goqueue.GetGoquEncoding(m.ContentType).Encode(ctx, m)