-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
126 lines (100 loc) · 4.07 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import asyncio
import argparse
import pprint
import logging
from screenlogicpy import ScreenLogicGateway, discovery
import prometheus_client
from prometheus_client.core import GaugeMetricFamily, StateSetMetricFamily, CounterMetricFamily
from prometheus_client.registry import Collector
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
prometheus_client.REGISTRY.unregister(prometheus_client.GC_COLLECTOR)
prometheus_client.REGISTRY.unregister(prometheus_client.PLATFORM_COLLECTOR)
prometheus_client.REGISTRY.unregister(prometheus_client.PROCESS_COLLECTOR)
class CustomCollector(Collector):
def __init__(self):
self.loop = asyncio.new_event_loop()
self.gateway = self.loop.run_until_complete(self.connect())
async def connect(self):
logger.info("Initializing gateway...")
hosts = await discovery.async_discover()
if len(hosts) > 0:
logger.info(f"Connecting to host {hosts[0]}")
gateway = ScreenLogicGateway()
await gateway.async_connect(**hosts[0])
return gateway
else:
logger.warning("No gateways found")
def collect(self):
logger.info("Collecting metrics...")
for m in self.loop.run_until_complete(self.async_collect()):
yield m
async def async_collect(self):
try:
await self.gateway.async_update()
except Exception as e:
logger.exception('Failed to update gateway')
logger.info('Trying to reconnect...')
self.gateway = await self.connect()
await self.gateway.async_update()
return [
*self.collect_misc(),
*self.collect_body(),
*self.collect_pump(),
*self.collect_circuit()
]
def collect_misc(self):
temp = GaugeMetricFamily('screenlogic_air_temp', 'Air temperature')
temp.add_metric([], self.gateway.get_data('controller', 'sensor', 'air_temperature', 'value'))
freeze_mode = GaugeMetricFamily('screenlogic_freeze_mode', 'Freeze mode')
freeze_mode.add_metric([], self.gateway.get_data('controller', 'sensor', 'freeze_mode', 'value'))
return [
temp,
freeze_mode
]
def collect_circuit(self):
circuit_state = GaugeMetricFamily('screenlogic_circuit_state', 'Circuit state', labels=['name'])
for circuit in self.gateway.get_data('circuit').values():
if circuit['name'].startswith('Feature'):
continue
circuit_state.add_metric([circuit['name']], circuit['value'])
return [
circuit_state
]
def collect_body(self):
temp = GaugeMetricFamily('screenlogic_body_temp', 'Body temperature', labels=['name'])
heat_setpoint = GaugeMetricFamily('screenlogic_body_heat_setpoint', 'Body temperature setpoint', labels=['name'])
heat_state = StateSetMetricFamily('screenlogic_body_heat_state', 'Body heat state', labels=['name'])
heat_mode = StateSetMetricFamily('screenlogic_body_heat_mode', 'Body heat mode', labels=['name'])
for body in self.gateway.get_data('body').values():
labels = [body['name']]
temp.add_metric(labels, body['last_temperature']['value'])
heat_setpoint.add_metric(labels, body['heat_setpoint']['value'])
heat_state.add_metric(labels, {
val: body['heat_state']['value'] == index
for index, val in enumerate(body['heat_state']['enum_options'])
})
heat_mode.add_metric(labels, {
val: body['heat_mode']['value'] == index
for index, val in enumerate(body['heat_mode']['enum_options'])
})
return [
temp,
heat_setpoint,
heat_state,
heat_mode
]
def collect_pump(self):
pump = self.gateway.get_data('pump', 0)
metrics = []
for i in ['watts', 'rpm', 'gpm']:
gauge = GaugeMetricFamily(f'screenlogic_pump_{i}', f'Pump {i}')
gauge.add_metric([], pump[f'{i}_now']['value'])
metrics.append(gauge)
return metrics
parser = argparse.ArgumentParser(description="Start the exporter")
parser.add_argument('-p', '--port', type=int, default=9199, help='The port to start the server on.')
args = parser.parse_args()
prometheus_client.REGISTRY.register(CustomCollector())
server, t = prometheus_client.start_http_server(args.port)
t.join()