-
Notifications
You must be signed in to change notification settings - Fork 8
/
example_counting_streams_bloom_filter.py
76 lines (66 loc) · 2.49 KB
/
example_counting_streams_bloom_filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""
Install PyProbables to use this code.
See: https://pyprobables.readthedocs.io/en/latest/code.html#bloomfilter
Straightforward use of BloomFilter.
"""
from probables import BloomFilter
import sys
sys.path.append("../")
from IoTPy.core.stream import Stream, _no_value, run
from IoTPy.agent_types.op import map_element
from IoTPy.helper_functions.print_stream import print_stream
def bloom_filter_stream(in_stream, out_stream, bloom_filter):
"""
Parameters
----------
in_stream: Stream
The input stream of the agent.
Each element of the input stream is a pair which
is either ('add', object) or ('check', object).
If ('add', z) appears in the input stream then
z is added to the Bloom Filter set.
If ('check', z) appears then (z, 'True') appears
on the output stream if z is in the filter, and
(z, 'False') appears otherwise.
out_stream: Stream
The output stream of the agent.
An element is added to the output stream when a 'check'
function_name appears on the input stream.
The output stream consists of pairs (object, boolean)
where boolean is True if and only if object
is in the input stream at this point.
bloom_filter: BloomFilter
An instance of the BloomFilter class.
"""
# The function for the map_element agent.
def func(element):
function_name, obj = element
if function_name == 'add':
bloom_filter.add(obj)
return _no_value
elif function_name == 'check':
return (obj, bloom_filter.check(obj))
else:
raise ValueError
# Create the map_element agent.
map_element(func, in_stream, out_stream)
#---------------------------------------------------------------------------
# TEST
#---------------------------------------------------------------------------
def test():
bloom_filter = BloomFilter(est_elements=1000, false_positive_rate=0.05)
x = Stream('Bloom Filter input')
y = Stream('Bloom Filter output')
bloom_filter_stream(x, y, bloom_filter=bloom_filter)
print_stream(y, y.name)
# Run test data
data=[('add', 'a'), ('add', 'b'), ('add', 'a'),
('check', 'c'), ('add', 'd'), ('check','a')]
x.extend(data)
run()
data=[('add', 'c'), ('check', 'b'), ('check', 'a'),
('check', 'c'), ('check', 'e'), ('add', 'a')]
x.extend(data)
run()
if __name__ == '__main__':
test()