From 4c500f9c1800972bfec4e0fc566537a1859f0eb1 Mon Sep 17 00:00:00 2001 From: SunilSKamath4s Date: Tue, 22 Feb 2022 10:54:14 +0530 Subject: [PATCH] error check ffor ack send --- message/amqp_message.go | 25 +++++++++++++++++++++++++ message/message.go | 2 ++ message/nano_message.go | 16 ++++++++++++++++ message/nats_message.go | 16 ++++++++++++++++ message/nats_streaming_message.go | 8 ++++++++ message/pubnub_message.go | 10 ++++++++++ message/redis_message.go | 10 ++++++++++ 7 files changed, 87 insertions(+) diff --git a/message/amqp_message.go b/message/amqp_message.go index 6187f9a..46b658a 100644 --- a/message/amqp_message.go +++ b/message/amqp_message.go @@ -56,3 +56,28 @@ func (m AmqpMessage) SendAck(ackMessage ...[]byte) { func (m AmqpMessage) SendNack(ackMessage ...[]byte) { m.RawMessage.Nack(false, true) } + +func (m AmqpMessage) SendAckWithError(ackMessage ...[]byte) error { + if val, ok := m.GetProperty("protocol_type"); ok && val == "reqrep" { + resp := []byte("OK") + if len(ackMessage) > 0 { + resp = ackMessage[0] + } + m.Responder.Publish( + "", + m.RawMessage.GetReplyTo(), + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + CorrelationId: m.RawMessage.GetCorrelationId(), + Body: resp, + }, + ) + } + return m.RawMessage.Ack(false) +} + +func (m AmqpMessage) SendNackWithError(ackMessage ...[]byte) error { + return m.RawMessage.Nack(false, true) +} diff --git a/message/message.go b/message/message.go index 705d15c..6d5809d 100644 --- a/message/message.go +++ b/message/message.go @@ -19,6 +19,8 @@ type Message interface { SetProperty(string, string) SendAck(...[]byte) SendNack(...[]byte) + SendAckWithError(...[]byte) error + SendNackWithError(...[]byte) error } type RawMessage interface { diff --git a/message/nano_message.go b/message/nano_message.go index 1177939..e8ccaa6 100644 --- a/message/nano_message.go +++ b/message/nano_message.go @@ -45,3 +45,19 @@ func (m NanoMessage) SendNack(ackMessage ...[]byte) { m.Responder.Send(resp) return } + +func (m NanoMessage) SendAckWithError(ackMsg ...[]byte) error { + resp := []byte("OK") + if len(ackMsg) > 0 { + resp = ackMsg[0] + } + return m.Responder.Send(resp) +} + +func (m NanoMessage) SendNackWithError(ackMessage ...[]byte) error { + resp := []byte("ERR") + if len(ackMessage) > 0 { + resp = ackMessage[0] + } + return m.Responder.Send(resp) +} diff --git a/message/nats_message.go b/message/nats_message.go index a9700f4..61830e2 100644 --- a/message/nats_message.go +++ b/message/nats_message.go @@ -45,3 +45,19 @@ func (m NatsMessage) SendNack(ackMessage ...[]byte) { m.Responder.Publish(m.RawMessage.GetReplyTo(), resp) return } + +func (m NatsMessage) SendAckWithError(ackMsg ...[]byte) error { + resp := []byte("OK") + if len(ackMsg) > 0 { + resp = ackMsg[0] + } + return m.Responder.Publish(m.RawMessage.GetReplyTo(), resp) +} + +func (m NatsMessage) SendNackWithError(ackMessage ...[]byte) error { + resp := []byte("NOK") + if len(ackMessage) > 0 { + resp = ackMessage[0] + } + return m.Responder.Publish(m.RawMessage.GetReplyTo(), resp) +} diff --git a/message/nats_streaming_message.go b/message/nats_streaming_message.go index 8bb0a79..b21291a 100644 --- a/message/nats_streaming_message.go +++ b/message/nats_streaming_message.go @@ -36,3 +36,11 @@ func (m NatsStreamMessage) SendAck(ackMsg ...[]byte) { func (m NatsStreamMessage) SendNack(ackMessage ...[]byte) { return } + +func (m NatsStreamMessage) SendAckWithError(ackMsg ...[]byte) error { + return m.RawMessage.Ack(false) +} + +func (m NatsStreamMessage) SendNackWithError(ackMessage ...[]byte) error { + return nil +} diff --git a/message/pubnub_message.go b/message/pubnub_message.go index f05670d..db3ef66 100644 --- a/message/pubnub_message.go +++ b/message/pubnub_message.go @@ -37,3 +37,13 @@ func (m *PubnubMessage) SendNack(ackMessage ...[]byte) { m.SetProperty("ack", "NOK") return } + +func (m *PubnubMessage) SendAckWithError(ackMsg ...[]byte) error { + m.SetProperty("ack", "OK") + return nil +} + +func (m *PubnubMessage) SendNackWithError(ackMessage ...[]byte) error { + m.SetProperty("ack", "NOK") + return nil +} diff --git a/message/redis_message.go b/message/redis_message.go index 497d696..016e599 100644 --- a/message/redis_message.go +++ b/message/redis_message.go @@ -37,3 +37,13 @@ func (m *RedisMessage) SendNack(ackMessage ...[]byte) { m.SetProperty("ack", "NOK") return } + +func (m *RedisMessage) SendAckWithError(ackMsg ...[]byte) error { + m.SetProperty("ack", "OK") + return nil +} + +func (m *RedisMessage) SendNackWithError(ackMessage ...[]byte) error { + m.SetProperty("ack", "NOK") + return nil +}