-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmessage.go
128 lines (114 loc) · 3.22 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package zkafka
import (
"context"
"errors"
"sync"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
// Message is a container for kafka message
type Message struct {
Key string
// There's a difference between a nil key and an empty key. A nil key gets assigned a topic partition by kafka via round-robin.
// An empty key is treated as a key with a value of "" and is assigned to a topic partition via the hash of the key (so will consistently go to the same key)
isKeyNil bool
Headers map[string][]byte
Offset int64
Partition int32
Topic string
GroupID string
TimeStamp time.Time
value []byte
topicPartition kafka.TopicPartition
fmt kFormatter
doneFunc func(ctx context.Context)
doneOnce sync.Once
schema string
}
// DoneWithContext is used to alert that message processing has completed.
// This marks the message offset to be committed
func (m *Message) DoneWithContext(ctx context.Context) {
m.doneOnce.Do(func() {
m.doneFunc(ctx)
})
}
// Done is used to alert that message processing has completed.
// This marks the message offset to be committed
func (m *Message) Done() {
if m == nil {
return
}
m.doneOnce.Do(func() {
m.doneFunc(context.Background())
})
}
// Decode reads message data and stores it in the value pointed to by v.
func (m *Message) Decode(v any) error {
if m.value == nil {
return errors.New("message is empty")
}
return m.unmarshall(v)
}
func (m *Message) unmarshall(target any) error {
if m.fmt == nil {
return errors.New("formatter is not supplied to decode kafka message")
}
return m.fmt.unmarshal(unmarshReq{
topic: m.Topic,
data: m.value,
target: target,
schema: m.schema,
})
}
// Value returns a copy of the current value byte array. Useful for debugging
func (m *Message) Value() []byte {
if m == nil || m.value == nil {
return nil
}
out := make([]byte, len(m.value))
copy(out, m.value)
return out
}
func makeProducerMessageRaw(_ context.Context, topic string, key *string, value []byte) kafka.Message {
kafkaMessage := kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Value: value,
}
if key != nil {
kafkaMessage.Key = []byte(*key)
}
return kafkaMessage
}
func addHeaders(kafkaMessage kafka.Message, headers map[string][]byte) kafka.Message {
for k, v := range headers {
addStringAttribute(&kafkaMessage, k, v)
}
return kafkaMessage
}
// addStringAttribute updates a kafka message header in place if the key exists already.
// If the key does not exist, it appends a new header.
func addStringAttribute(msg *kafka.Message, k string, v []byte) {
for i, h := range msg.Headers {
if h.Key == k {
msg.Headers[i].Value = v
return
}
}
msg.Headers = append(msg.Headers, kafka.Header{Key: k, Value: v})
}
// Headers extracts metadata from kafka message and stores it in a basic map
func headers(msg kafka.Message) map[string][]byte {
res := make(map[string][]byte)
for _, h := range msg.Headers {
res[h.Key] = h.Value
}
return res
}
// Response is a kafka response with the Partition where message was sent to along with its assigned Offset
type Response struct {
Partition int32
Offset int64
}