-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathlist_partition_reassignments_request.go
112 lines (96 loc) · 2.89 KB
/
list_partition_reassignments_request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package healer
import (
"encoding/binary"
)
// ListPartitionReassignmentsRequest is a request to kafka to list partition reassignments
type ListPartitionReassignmentsRequest struct {
*RequestHeader
TimeoutMS int32
Topics []struct {
Name string
Partitions []int32
TaggedFields TaggedFields
}
TaggedFields TaggedFields
}
// NewListPartitionReassignmentsRequest creates a new ListPartitionReassignmentsRequest
func NewListPartitionReassignmentsRequest(clientID string, timeoutMS int32) ListPartitionReassignmentsRequest {
requestHeader := &RequestHeader{
APIKey: API_ListPartitionReassignments,
APIVersion: 0,
ClientID: &clientID,
}
return ListPartitionReassignmentsRequest{
RequestHeader: requestHeader,
TimeoutMS: timeoutMS,
}
}
// AddTP adds a topic/partition to the request
func (r *ListPartitionReassignmentsRequest) AddTP(topicName string, pid int32) {
for i := range r.Topics {
topic := &r.Topics[i]
if topic.Name == topicName {
for _, _pid := range topic.Partitions {
if _pid == pid {
return
}
}
topic.Partitions = append(topic.Partitions, pid)
return
}
}
r.Topics = append(r.Topics, struct {
Name string
Partitions []int32
TaggedFields TaggedFields
}{
Name: topicName,
Partitions: []int32{pid},
TaggedFields: nil,
})
}
func (r ListPartitionReassignmentsRequest) length() int {
requestLength := r.RequestHeader.length()
requestLength += 4 // TimeoutMS
requestLength += 4 // len(Topics)
for _, topic := range r.Topics {
requestLength += 2 + len(topic.Name) // len(TopicName)
requestLength += 4 // len(Partitions)
for range topic.Partitions {
requestLength += 4 // PartitionID
}
requestLength++ // TaggedFields
}
requestLength++ // TaggedFields
return requestLength
}
// Encode encodes a ListPartitionReassignmentsRequest into a byte array.
func (r ListPartitionReassignmentsRequest) Encode(version uint16) []byte {
requestLength := r.length()
payload := make([]byte, requestLength+4)
offset := 0
defer func() {
binary.BigEndian.PutUint32(payload, uint32(offset-4))
}()
offset += 4
offset += r.RequestHeader.EncodeTo(payload[offset:])
binary.BigEndian.PutUint32(payload[offset:], uint32(r.TimeoutMS))
offset += 4
if r.Topics == nil {
offset += binary.PutUvarint(payload[offset:], 0)
} else {
offset += binary.PutUvarint(payload[offset:], uint64(1+len(r.Topics)))
}
for _, topic := range r.Topics {
offset += binary.PutUvarint(payload[offset:], uint64(1+len(topic.Name)))
offset += copy(payload[offset:], topic.Name)
offset += binary.PutUvarint(payload[offset:], uint64(1+len(topic.Partitions)))
for _, pid := range topic.Partitions {
binary.BigEndian.PutUint32(payload[offset:], uint32(pid))
offset += 4
}
offset += copy(payload[offset:], topic.TaggedFields.Encode())
}
offset += copy(payload[offset:], r.TaggedFields.Encode())
return payload[:offset]
}