forked from evermarkets/market-maker-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
engine.py
83 lines (65 loc) · 2.69 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import asyncio
import traceback
from strategy.market_maker import market_maker
from gateways.emx.adapter import emx_adapter
from logger import logging
strategies_factory = {
"market_maker": market_maker,
}
class Engine:
def __init__(self, cfg):
self.logger = logging.getLogger()
self.is_active = True
self.exchange_adapter = emx_adapter(cfg.adapter)
try:
strategy_name = cfg.strategy.name
except AttributeError:
self.logger.exception("strategy was not found")
raise Exception("strategy was not found")
try:
self.strategy = strategies_factory[strategy_name](cfg.strategy, self.exchange_adapter)
except KeyError:
self.logger.exception("strategy was not found in a factory")
raise Exception("strategy was not found in a factory")
async def listen_updates(self):
while self.is_active:
try:
await self.exchange_adapter.listen()
except Exception as err:
self.logger.info("listen error: {}".format(err))
await self.strategy.handle_exception(str(err))
self.logger.warning("listen_updates was stopped")
async def run_strategy(self):
while self.is_active:
try:
await self.strategy.run()
except Exception as err:
try:
await self.strategy.handle_exception(str(err))
except Exception as err:
self.logger.warning("run_strategy handle_exception failed on {}".format(err))
await asyncio.sleep(0.1)
self.logger.warning("run_strategy was stopped")
def run(self):
self.logger.info("Engine started")
loop = asyncio.get_event_loop()
def handle_async_exception(loop, ctx):
try:
self.logger.error("Exception in async task: {0}".format(ctx["exception"]))
except KeyError:
self.logger.error("Exception in async task: {0}".format(ctx))
stack_str = traceback.format_exc()
self.logger.error("Current traceback: {}".format(stack_str))
for line in traceback.format_stack():
self.logger.error(line.strip())
loop.set_exception_handler(handle_async_exception)
asyncio.ensure_future(self.exchange_adapter.start())
asyncio.ensure_future(self.listen_updates())
asyncio.ensure_future(self.run_strategy())
loop.set_debug(enabled=True)
loop.slow_callback_duration = 0.05
loop.run_forever()
loop.close()
self.logger.info("Engine stopped")
def stop(self):
self.subscriptions.stop()