-
Notifications
You must be signed in to change notification settings - Fork 9
/
subscribe.rb
45 lines (40 loc) · 1011 Bytes
/
subscribe.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
require 'pg'
require './state'
conn = PG.connect(
host: "MATERIALIZE_HOST",
port: 6875,
dbname: "materialize",
user: "MATERIALIZE_USERNAME",
password: "MATERIALIZE_PASSWORD",
sslmode: 'require'
)
conn.exec('BEGIN')
conn.exec('DECLARE c CURSOR FOR SUBSCRIBE (SELECT sum FROM counter_sum) WITH (PROGRESS)')
updated = false
state = State.new(false)
buffer = []
# Loop indefinitely
loop do
conn.exec('FETCH c') do |result|
result.each do |row|
# Map row fields
ts = row["mz_timestamp"]
progress = row["mz_progressed"]
diff = row["mz_diff"]
rowData = { sum: row["sum"] }
# When a progress is detected, get the state
if progress == 't'
if updated
updated = false
state.update(buffer, ts.to_i)
buffer = []
puts state.get_state
end
else
# Update the state with the last data
updated = true
buffer.push({ value: rowData, diff: diff.to_i })
end
end
end
end