-
Notifications
You must be signed in to change notification settings - Fork 2
/
OPC_UDP_receiver.py
80 lines (63 loc) · 2.34 KB
/
OPC_UDP_receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# pip install pygame
# Quick script generated by ChatGPT to act as a single OPC UDP receiver for testing
import pygame
import socket
import struct
# Configuration
SCALE = 8 # Scale factor for each pixel
WIDTH = 32 # Window width in scaled pixels
HEIGHT = 32 # Window height in scaled pixels
PIXEL_COUNT = WIDTH * HEIGHT
OPC_PORT = 7890 # Port to listen for OPC data
def opc_data_to_pixels(data):
"""Convert OPC data to a list of RGB tuples."""
pixels = []
for i in range(0, len(data), 3):
r = data[i]
g = data[i+1]
b = data[i+2]
pixels.append((r, g, b))
return pixels
def main():
pygame.init()
screen = pygame.display.set_mode((WIDTH * SCALE, HEIGHT * SCALE))
pygame.display.set_caption("OPC Display")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', OPC_PORT))
print(f"Listening for OPC data on port {OPC_PORT} via UDP...")
running = True
clock = pygame.time.Clock()
while running:
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
try:
# OPC header is 4 bytes: channel (1 byte), command (1 byte), length (2 bytes)
data, addr = sock.recvfrom(4 + PIXEL_COUNT * 3)
if len(data) < 4:
continue # Not enough data for a header
header = data[:4]
channel, command, length = struct.unpack('!BBH', header)
if command != 0: # Only handle "Set Pixel Colors" command
continue
if len(data) < 4 + length:
continue # Not enough data for the specified length
pixel_data = data[4:4 + length]
# Convert OPC data to pixel colors
pixels = opc_data_to_pixels(pixel_data)
# Clear the screen
screen.fill((0, 0, 0))
# Draw pixels to the screen at the specified scale
for i, color in enumerate(pixels):
x = (i % WIDTH) * SCALE
y = (i // WIDTH) * SCALE
if y < HEIGHT * SCALE:
pygame.draw.rect(screen, color, (x, y, SCALE, SCALE))
pygame.display.flip()
clock.tick(60) # Limit to 60 FPS
except socket.error:
break
sock.close()
pygame.quit()
if __name__ == "__main__":
main()