-
Notifications
You must be signed in to change notification settings - Fork 0
/
index_votes.py
139 lines (130 loc) · 4.17 KB
/
index_votes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import logging
import os
import textwrap
from psycopg2.errors import ForeignKeyViolation
import requests
from utils import (
is_archive_node,
PollingProcess,
events_to_process,
new_events_to_process,
)
logger = logging.getLogger(__name__)
def fetch_votes_by_proposal(height, proposal_id):
if is_archive_node():
headers = {"x-cosmos-block-height": str(height)}
else:
headers = None
# Currently EventVote only contains proposal id
# Eventually EventVote may contain proposal id and voter address
# At which point we could get the vote with this endpoint:
# /cosmos/group/v1/vote_by_proposal_voter/{proposal_id}/{voter}
# Ref: https://github.com/regen-network/indexer/pull/38#discussion_r1310958235
resp = requests.get(
f"{os.environ['REGEN_API']}/cosmos/group/v1/votes_by_proposal/{proposal_id}",
headers=headers,
)
resp.raise_for_status()
return resp.json()["votes"]
def gen_records(cur, query):
cur.execute(query)
for record in cur:
yield record
def _index_votes(pg_conn, _client, _chain_num):
with pg_conn.cursor() as cur:
cur.execute(
"SELECT MAX(block_height) FROM votes WHERE chain_num = %s", (_chain_num,)
)
res = cur.fetchone()
max_block_height = 0 if res[0] is None else res[0]
logger.debug(f"{_chain_num=} {max_block_height=}")
for event in new_events_to_process(
cur, "votes", _chain_num, max_block_height
):
(
type,
block_height,
tx_idx,
msg_idx,
_,
_,
chain_num,
timestamp,
tx_hash,
) = event[0]
normalize = {}
normalize["type"] = type
normalize["block_height"] = block_height
normalize["tx_idx"] = tx_idx
normalize["msg_idx"] = msg_idx
normalize["chain_num"] = chain_num
normalize["timestamp"] = timestamp
normalize["tx_hash"] = tx_hash
for entry in event:
(_, _, _, _, key, value, _, _, _) = entry
value = value.strip('"')
normalize[key] = value
logger.debug(normalize)
votes = fetch_votes_by_proposal(
normalize["block_height"], normalize["proposal_id"]
)
logger.debug(f"{votes=}")
insert_text = textwrap.dedent(
"""
INSERT INTO votes (
type,
block_height,
tx_idx,
msg_idx,
chain_num,
timestamp,
tx_hash,
proposal_id,
voter,
option,
metadata,
submit_time
) VALUES (
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s
) ON CONFLICT DO NOTHING;"""
).strip("\n")
with pg_conn.cursor() as _cur:
for vote in votes:
row = (
normalize["type"],
normalize["block_height"],
normalize["tx_idx"],
normalize["msg_idx"],
normalize["chain_num"],
normalize["timestamp"],
normalize["tx_hash"],
normalize["proposal_id"],
vote["voter"],
vote["option"],
vote["metadata"],
vote["submit_time"],
)
_cur.execute(
insert_text,
row,
)
logger.debug(_cur.statusmessage)
pg_conn.commit()
logger.info("vote inserted..")
def index_votes():
p = PollingProcess(
target=_index_votes,
sleep_secs=1,
)
p.start()