forked from Beastlorion/hubble_exchange_bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_price_feed.t.py
94 lines (81 loc) · 3.71 KB
/
test_price_feed.t.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import asyncio
from price_feeds import PriceFeed
import time
import websockets
from binance import AsyncClient, BinanceSocketManager
import json
import tools
binance_futures_feed_stopped = True
mid_price = 0
mid_price_last_updated_at = 0
async def start_binance_futures_feed(market, frequency, mid_price_streaming_event):
global binance_futures_feed_stopped
symbol = tools.get_symbol_from_name(market) + "USDT"
print(f"Starting Binance Futures price feed for {symbol}...")
task = asyncio.create_task(
subscribe_to_binance_futures_feed(symbol, frequency, mid_price_streaming_event)
)
print("Binance Futures price feed started.")
return task
# ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@depth@100ms"
async def subscribe_to_binance_futures_feed(
symbol, frequency, mid_price_streaming_event
):
global binance_futures_feed_stopped
global mid_price
global mid_price_last_updated_at
print(f"subscribe_to_binance_futures_feed for {symbol}...")
ws_url = f"wss://fstream.binance.com/ws/{symbol.lower()}@bookTicker"
retry_delay = 3 # Initial retry delay in seconds
max_retries = 5 # Maximum number of retries
attempt_count = 0 # Attempt counter
next_timestamp = 0
while True:
try:
async with websockets.connect(ws_url) as websocket:
print("Connected to the server.")
if binance_futures_feed_stopped:
# @todo check if this is the correct way to clear the event
print("setting mid_price_streaming_event")
mid_price_streaming_event.set()
binance_futures_feed_stopped = False
attempt_count = 0 # Reset attempt counter on successful connection
retry_delay = 3 # Reset retry delay on successful connection
while True:
message = await websocket.recv()
data = json.loads(message)
# Check if the data is stale
if next_timestamp - float(data["T"]) / 1000 > 0:
print(f"skipping data: {float(data['T'])/1000}")
continue # discard the data and wait for the next piece
print(
f"data fetched at time: {time.time()} lag of {time.time() - float(data['T'])/1000}"
)
print(f"binance data: {data}")
mid_price = round((float(data["b"]) + float(data["a"])) / 2, 5)
print(f"Mid price: {mid_price}")
mid_price_last_updated_at = time.time()
print(f"sleeping at {time.time()}")
next_timestamp = time.time() + frequency
# await asyncio.sleep(frequency)
print(f"woke up at {time.time()}")
except Exception as e:
if attempt_count >= max_retries:
# @todo check how to bubble the exception
print("Maximum retry attempts reached. Exiting.")
break
mid_price_streaming_event.clear()
binance_futures_feed_stopped = True
print(f"Binance futures feed connection error: {e}")
attempt_count += 1
print(
f"Attempting to reconnect in {retry_delay} seconds... (Attempt {attempt_count}/{max_retries})"
)
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
async def test_price_feed():
mid_price_streaming_event = asyncio.Event()
task = await start_binance_futures_feed("AVAX-USDT", 1, mid_price_streaming_event)
await task
# Start and run until complete
asyncio.run(test_price_feed())