-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathcuckoofilter.go
314 lines (275 loc) · 7.8 KB
/
cuckoofilter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
/*
* Copyright (C) linvon
* Date 2021/2/18 10:29
*/
package cuckoo
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"github.com/dgryski/go-metro"
)
// maximum number of cuckoo kicks before claiming failure
const kMaxCuckooCount uint = 500
const (
// TableTypeSingle normal single table
TableTypeSingle = 0
// TableTypePacked packed table, use semi-sort to save 1 bit per item
TableTypePacked = 1
)
type table interface {
Init(tagsPerBucket, bitsPerTag, num uint, initialBucketsHint []byte) error
NumBuckets() uint
FindTagInBuckets(i1, i2 uint, tag uint32) bool
DeleteTagFromBucket(i uint, tag uint32) bool
InsertTagToBucket(i uint, tag uint32, kickOut bool, oldTag *uint32) bool
SizeInTags() uint
SizeInBytes() uint
Info() string
BitsPerItem() uint
Reader() (io.Reader, uint)
Decode([]byte) error
Reset()
}
func getTable(tableType uint) interface{} {
switch tableType {
case TableTypePacked:
return NewPackedTable()
default:
return NewSingleTable()
}
}
type victimCache struct {
index uint
tag uint32
used bool
}
const filterMetadataSize = 3*bytesPerUint32 + 1
// Filter cuckoo filter type struct
type Filter struct {
victim victimCache
numItems uint
table table
}
//NewFilter return a new initialized filter
/*
tagsPerBucket: num of tags for each bucket, which is b in paper. tag is fingerprint, which is f in paper.
bitPerItem: num of bits for each item, which is length of tag(fingerprint)
maxNumKeys: num of keys that filter will store. this value should close to and lower
nextPow2(maxNumKeys/tagsPerBucket) * maxLoadFactor. cause table.NumBuckets is always a power of two
*/
func NewFilter(tagsPerBucket, bitsPerItem, maxNumKeys, tableType uint) *Filter {
numBuckets := getNextPow2(uint64(maxNumKeys / tagsPerBucket))
if float64(maxNumKeys)/float64(numBuckets*tagsPerBucket) > maxLoadFactor(tagsPerBucket) {
numBuckets <<= 1
}
if numBuckets == 0 {
numBuckets = 1
}
table := getTable(tableType).(table)
_ = table.Init(tagsPerBucket, bitsPerItem, numBuckets, nil)
return &Filter{
table: table,
}
}
func (f *Filter) indexHash(hv uint32) uint {
// table.NumBuckets is always a power of two, so modulo can be replaced with bitwise-and:
return uint(hv) & (f.table.NumBuckets() - 1)
}
func (f *Filter) tagHash(hv uint32) uint32 {
return hv%((1<<f.table.BitsPerItem())-1) + 1
}
func (f *Filter) generateIndexTagHash(item []byte) (index uint, tag uint32) {
hash := metro.Hash64(item, 1337)
index = f.indexHash(uint32(hash >> 32))
tag = f.tagHash(uint32(hash))
return
}
func (f *Filter) altIndex(index uint, tag uint32) uint {
// 0x5bd1e995 is the hash constant from MurmurHash2
return f.indexHash(uint32(index) ^ (tag * 0x5bd1e995))
}
// Size return num of items that filter store
func (f *Filter) Size() uint {
var c uint
if f.victim.used {
c = 1
}
return f.numItems + c
}
// LoadFactor return current filter's loadFactor
func (f *Filter) LoadFactor() float64 {
return 1.0 * float64(f.Size()) / float64(f.table.SizeInTags())
}
// SizeInBytes return bytes occupancy of filter's table
func (f *Filter) SizeInBytes() uint {
return f.table.SizeInBytes()
}
// BitsPerItem return bits occupancy per item of filter's table
func (f *Filter) BitsPerItem() float64 {
return 8.0 * float64(f.table.SizeInBytes()) / float64(f.Size())
}
// Add add an item into filter, return false when filter is full
func (f *Filter) Add(item []byte) bool {
if f.victim.used {
return false
}
i, tag := f.generateIndexTagHash(item)
return f.addImpl(i, tag)
}
// AddUnique add an item into filter, return false when filter already contains it or filter is full
func (f *Filter) AddUnique(item []byte) bool {
if f.Contain(item) {
return false
}
return f.Add(item)
}
func (f *Filter) addImpl(i uint, tag uint32) bool {
curIndex := i
curTag := tag
var oldTag uint32
var count uint
var kickOut bool
for count = 0; count < kMaxCuckooCount; count++ {
kickOut = count > 0
oldTag = 0
if f.table.InsertTagToBucket(curIndex, curTag, kickOut, &oldTag) {
f.numItems++
return true
}
if kickOut {
curTag = oldTag
}
curIndex = f.altIndex(curIndex, curTag)
}
f.victim.index = curIndex
f.victim.tag = curTag
f.victim.used = true
return true
}
// Contain return if filter contains an item
func (f *Filter) Contain(key []byte) bool {
i1, tag := f.generateIndexTagHash(key)
i2 := f.altIndex(i1, tag)
hit := f.victim.used && tag == f.victim.tag && (i1 == f.victim.index || i2 == f.victim.index)
if hit || f.table.FindTagInBuckets(i1, i2, tag) {
return true
}
return false
}
// Delete delete item from filter, return false when item not exist
func (f *Filter) Delete(key []byte) bool {
i1, tag := f.generateIndexTagHash(key)
i2 := f.altIndex(i1, tag)
if f.table.DeleteTagFromBucket(i1, tag) || f.table.DeleteTagFromBucket(i2, tag) {
f.numItems--
goto TryEliminateVictim
} else if f.victim.used && tag == f.victim.tag && (i1 == f.victim.index || i2 == f.victim.index) {
f.victim.used = false
return true
} else {
return false
}
TryEliminateVictim:
if f.victim.used {
f.victim.used = false
i := f.victim.index
tag = f.victim.tag
f.addImpl(i, tag)
}
return true
}
// Reset reset the filter
func (f *Filter) Reset() {
f.table.Reset()
f.numItems = 0
f.victim.index = 0
f.victim.tag = 0
f.victim.used = false
}
// FalsePositiveRate return the False Positive Rate of filter
// Notice that this will reset filter
func (f *Filter) FalsePositiveRate() float64 {
n1 := make([]byte, 4)
f.Reset()
n := f.table.SizeInTags()
for i := uint32(0); i < uint32(n); i++ {
binary.BigEndian.PutUint32(n1, i)
f.Add(n1)
}
var rounds uint32 = 100000
fp := 0
for i := uint32(0); i < rounds; i++ {
binary.BigEndian.PutUint32(n1, i+uint32(n)+1)
if f.Contain(n1) {
fp++
}
}
f.Reset()
return float64(fp) / float64(rounds)
}
// Info return filter's detail info
func (f *Filter) Info() string {
return fmt.Sprintf("CuckooFilter Status:\n"+
"\t\t%v\n"+
"\t\tKeys stored: %v\n"+
"\t\tLoad factor: %v\n"+
"\t\tHashtable size: %v KB\n"+
"\t\tbit/key: %v\n",
f.table.Info(), f.Size(), f.LoadFactor(), f.table.SizeInBytes()>>10, f.BitsPerItem())
}
// Encode returns a byte slice representing a Cuckoo filter
func (f *Filter) Encode() ([]byte, error) {
filterReader, filterSize := f.EncodeReader()
buf := make([]byte, filterSize)
if _, err := io.ReadFull(filterReader, buf); err != nil {
return nil, err
}
return buf, nil
}
// EncodeReader returns a reader representing a Cuckoo filter
func (f *Filter) EncodeReader() (io.Reader, uint) {
var metadata [filterMetadataSize]byte
for i, n := range []uint32{uint32(f.numItems), uint32(f.victim.index), f.victim.tag} {
binary.LittleEndian.PutUint32(metadata[i*bytesPerUint32:], n)
}
victimUsed := byte(0)
if f.victim.used {
victimUsed = byte(1)
}
metadata[bytesPerUint32*3] = victimUsed
tableReader, tableEncodedSize := f.table.Reader()
return io.MultiReader(bytes.NewReader(metadata[:]), tableReader), uint(len(metadata)) + tableEncodedSize
}
// Decode returns a Cuckoo Filter using a copy of the provided byte slice.
func Decode(b []byte) (*Filter, error) {
copiedBytes := make([]byte, len(b))
copy(copiedBytes, b)
return DecodeFrom(copiedBytes)
}
// DecodeFrom returns a Cuckoo Filter using the exact provided byte slice (no copy).
func DecodeFrom(b []byte) (*Filter, error) {
if len(b) < 20 {
return nil, errors.New("unexpected bytes length")
}
numItems := uint(binary.LittleEndian.Uint32(b[0*bytesPerUint32:]))
curIndex := uint(binary.LittleEndian.Uint32(b[1*bytesPerUint32:]))
curTag := binary.LittleEndian.Uint32(b[2*1*bytesPerUint32:])
used := b[12] == byte(1)
tableType := uint(b[13])
table := getTable(tableType).(table)
if err := table.Decode(b[13:]); err != nil {
return nil, err
}
return &Filter{
table: table,
numItems: numItems,
victim: victimCache{
index: curIndex,
tag: curTag,
used: used,
},
}, nil
}