-
Notifications
You must be signed in to change notification settings - Fork 27
/
frontiers_websockets.py
158 lines (137 loc) · 5.96 KB
/
frontiers_websockets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Nano Telegram bot
# @NanoWalletBot https://t.me/NanoWalletBot
#
# Source code:
# https://github.com/SergiySW/NanoWalletBot
#
# Released under the BSD 3-Clause License
#
"""
Usage:
Press Ctrl-C on the command line or send a signal to the process to stop the server.
"""
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters
from telegram import Bot, ParseMode
from telegram.error import BadRequest
from telegram.utils.request import Request
import logging
import json
import time, math
import asyncio, websockets
# Parse config
from six.moves import configparser
config = configparser.ConfigParser()
config.read('bot.cfg')
log_file_frontiers = config.get('main', 'log_file_frontiers')
large_amount_warning = int(config.get('main', 'large_amount_warning'))
ws_url = config.get('main', 'ws_url')
# Enable logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO, filename=log_file_frontiers)
logging.getLogger("requests").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
hash_url = 'https://nanocrawler.cc/explorer/block/'
# MySQL requests
from common_mysql import *
# Request to node
from common_rpc import *
# Frontiers functions
from common_sender import *
# Common functions
from common import push, mrai_text, bot_start
# Translation
with open('language.json') as lang_file:
language = json.load(lang_file)
def lang_text(text_id, lang_id):
try:
return language[lang_id][text_id]
except KeyError:
return language['en'][text_id]
def subscription(topic: str, ack: bool=False, options: dict=None):
d = {"action": "subscribe", "topic": topic, "ack": ack}
if options is not None:
d["options"] = options
return d
async def websockets_receive():
async with websockets.connect(ws_url) as websocket:
bot = bot_start()
await websocket.send(json.dumps(subscription("confirmation", options={"include_election_info": "false", "include_block":"true", "all_local_accounts": "true"}, ack=True)))
connect = json.loads(await websocket.recv()) # ack
print(connect)
logging.info(connect)
while 1:
rec = json.loads(await websocket.recv())
topic = rec.get("topic", None)
if topic:
item = rec["message"]
if topic == "confirmation":
item['type'] = item['block']['subtype']
if (item['type'] == 'receive' or item['type'] == 'change' or item['type'] == 'epoch'):
balance = int(math.floor(int(item['block']['balance']) / (10 ** 24)))
account_mysql = mysql_select_by_account(item['account'])
extra = False
if (account_mysql is False):
account_mysql = mysql_select_by_account_extra(item['account'])
extra = True
if (account_mysql[2] != item['hash']): # Compare frontiers
if (extra):
mysql_update_frontier_extra(item['account'], item['hash'])
else:
mysql_update_frontier(item['account'], item['hash'])
if (item['type'] == 'receive'): # Receive blocks
sender_account = rpc({"action": "block_info", "hash": item['block']['link']}, 'block_account')
lang_id = mysql_select_language(account_mysql[0])
#logging.info('{0} --> {1} {2}'.format(mrai_text(account_mysql[3]), mrai_text(balance), item['hash']))
sender = find_sender (item, account_mysql, sender_account, balance, lang_text)
received_amount = int(math.floor(int(item['amount']) / (10 ** 24)))
text = lang_text('frontiers_receive', lang_id).format(mrai_text(received_amount), mrai_text(balance), mrai_text(0), item['hash'], hash_url, sender)
try:
push(bot, account_mysql[0], text)
except BadRequest as e:
logging.exception('Bad request account {0}'.format(account_mysql[0]))
logging.info('{0} Nano (XRB) received by {1}, hash: {2}'.format(mrai_text(received_amount), account_mysql[0], item['hash']))
#print(text)
# Large amount check
if (received_amount >= large_amount_warning):
time.sleep(0.1)
try:
push(bot, account_mysql[0], lang_text('frontiers_large_amount_warning', lang_id))
except BadRequest as e:
logging.exception('Bad request account {0}'.format(account_mysql[0]))
elif (item['type'] == 'change'): # Change blocks
logging.warning('Change block {0} for account {1}'.format(item['hash'], item['account']))
if (balance != int(account_mysql[3])):
logging.error('Balance change: {0} --> {1}'.format(mrai_text(balance), mrai_text(int(account_mysql[3]))))
elif (item['type'] == 'epoch'): # Epoch blocks
logging.warning('Epoch block {0} for account {1}'.format(item['hash'], item['account']))
if (balance != int(account_mysql[3])):
logging.error('Balance change: {0} --> {1}'.format(mrai_text(balance), mrai_text(int(account_mysql[3]))))
# Previous block check
previous_zero = (item['block']['previous'] == '0000000000000000000000000000000000000000000000000000000000000000' and account_mysql[2] is None)
if ((previous_zero is False) and item['block']['previous'] != account_mysql[2]):
logging.error('Mismatch for previous block. Expected {0}, received {1}'.format(account_mysql[2], item['block']['previous']))
if (item['type'] == 'receive'):
time.sleep(0.1)
try:
push(bot, account_mysql[0], 'Please check received block in explorer, you can receive notification for some older block')
except BadRequest as e:
logging.exception('Bad request account {0}'.format(account_mysql[0]))
else:
logging.warning('Unexpected WebSockets message: {0}'.format(json.dumps(item)))
stopped = False
while (stopped is False):
try:
asyncio.get_event_loop().run_until_complete(websockets_receive())
except KeyboardInterrupt:
stopped = True
pass
except ConnectionRefusedError:
logging.warning("Error connecting to websocket server")
print("Error connecting to websocket server")
time.sleep(0.5)
except Exception as e:
logging.exception("message")
time.sleep(30)