generated from DiamondLightSource/bookshelf-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdaqmessenger.py
100 lines (74 loc) · 2.92 KB
/
daqmessenger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import json
import time
from collections import deque
import stomp
class DaqScanListener(stomp.ConnectionListener):
def __init__(self):
self.queue = deque()
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
m = json.loads(frame.body)
self.queue.append(m)
class DaqScanListener4(stomp.ConnectionListener):
def __init__(self):
self.queue = deque()
def on_error(self, headers, message):
logger.error('Received an error "%s"' % message)
def on_message(self, headers, message):
m = json.loads(message)
self.queue.append(m)
class DaqMessenger():
def __init__(self, beamline):
self.beamline = beamline
self.old_stomp = stomp.__version__[0] == 4
def connect(self):
self.conn = stomp.Connection(
[(self.beamline, 61613)], auto_content_length=False)
if self.old_stomp:
self.conn.start()
self.conn.connect()
def disconnect(self):
self.conn.disconnect()
def on_scan(self, message_function, sleep=1):
dsl = DaqScanListener4() if self.old_stomp else DaqScanListener()
self.conn.set_listener("scan", dsl)
self.conn.subscribe(
destination='/topic/gda.messages.scan', id=1, ack='auto')
while 1:
while dsl.queue:
m = dsl.queue.popleft()
message_function(m)
time.sleep(sleep)
def send_file(self, path):
message = json.dumps({'filePath': path})
destination = '/topic/org.dawnsci.file.topic'
self._send_message(destination, message)
def send_start(self, path):
message = json.dumps(
{'filePath': path, "status": "STARTED", "swmrStatus": "ENABLED"})
destination = '/topic/gda.messages.processing'
self._send_message(destination, message)
def send_update(self, path):
message = json.dumps(
{'filePath': path, "status": "UPDATED", "swmrStatus": "ACTIVE"})
destination = '/topic/gda.messages.processing'
self._send_message(destination, message)
def send_finished(self, path):
message = json.dumps(
{'filePath': path, "status": "FINISHED", "swmrStatus": "ACTIVE"})
destination = '/topic/gda.messages.processing'
self._send_message(destination, message)
def send_poni(self, path, status, message):
"""
styatus is ERROR WARN OK
"""
message = json.dumps(
{'calibration_filepath': path, "status": status, "message": message})
destination = '/topic/gda.messages.calibration.xrd2'
self._send_message(destination, message)
def _send_message(self, destination, message):
if self.old_stomp:
self.conn.send(destination, message, ack='auto')
else:
self.conn.send(destination=destination, body=message, ack='auto')