forked from reo7sp/go-tarantool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscribe.go
114 lines (95 loc) · 2.29 KB
/
subscribe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package tarantool
import (
"github.com/tinylib/msgp/msgp"
)
// Subscribe is the SUBSCRIBE command
type Subscribe struct {
UUID string
ReplicaSetUUID string
VClock VectorClock
Anon bool
}
var _ Query = (*Subscribe)(nil)
func (q *Subscribe) GetCommandID() uint {
return SubscribeCommand
}
// MarshalMsg implements msgp.Marshaler
func (q *Subscribe) MarshalMsg(b []byte) (o []byte, err error) {
o = b
if q.Anon {
o = msgp.AppendMapHeader(o, 4)
o = msgp.AppendUint(o, KeyReplicaAnon)
o = msgp.AppendBool(o, true)
} else {
o = msgp.AppendMapHeader(o, 3)
}
o = msgp.AppendUint(o, KeyInstanceUUID)
o = msgp.AppendString(o, q.UUID)
o = msgp.AppendUint(o, KeyReplicaSetUUID)
o = msgp.AppendString(o, q.ReplicaSetUUID)
o = msgp.AppendUint(o, KeyVClock)
o = msgp.AppendMapHeader(o, uint32(len(q.VClock)))
for id, lsn := range q.VClock {
o = msgp.AppendUint(o, uint(id))
o = msgp.AppendUint64(o, lsn)
}
return o, nil
}
// UnmarshalMsg implements msgp.Unmarshaler
func (q *Subscribe) UnmarshalMsg([]byte) (buf []byte, err error) {
return buf, ErrNotSupported
}
type SubscribeResponse struct {
ReplicaSetUUID string
VClock VectorClock
}
// UnmarshalMsg implements msgp.Unmarshaller
func (sr *SubscribeResponse) UnmarshalMsg(data []byte) (buf []byte, err error) {
// skip binary header
if buf, err = msgp.Skip(data); err != nil {
return
}
// unmarshal body
var count uint32
if count, buf, err = msgp.ReadMapHeaderBytes(buf); err != nil {
return
}
for ; count > 0; count-- {
var key uint
if key, buf, err = msgp.ReadUintBytes(buf); err != nil {
return
}
switch key {
case KeyReplicaSetUUID:
var str string
if str, buf, err = msgp.ReadStringBytes(buf); err != nil {
return
}
sr.ReplicaSetUUID = str
case KeyVClock:
var n uint32
var id uint32
var lsn uint64
if n, buf, err = msgp.ReadMapHeaderBytes(buf); err != nil {
return
}
sr.VClock = NewVectorClock()
for ; n > 0; n-- {
if id, buf, err = msgp.ReadUint32Bytes(buf); err != nil {
return
}
if lsn, buf, err = msgp.ReadUint64Bytes(buf); err != nil {
return
}
if !sr.VClock.Follow(id, lsn) {
return buf, ErrVectorClock
}
}
default:
if buf, err = msgp.Skip(buf); err != nil {
return
}
}
}
return
}