-
Notifications
You must be signed in to change notification settings - Fork 0
/
zmqGenerator.c
168 lines (135 loc) · 4.27 KB
/
zmqGenerator.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
/**
* This is an n-event generator which reads a NeXus file, makes
* an event list from it and sends that using 0MQ.
*
* Mark Koennecke, June 2015
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <zmq.h>
#include "nexus2event.h"
#include "posix_timers.h"
#include "md5.h"
/* #include "config.h" */
static unsigned long pulseID = 0L;
void timer_func()
{
pulseID++;
}
int main(int argc, char *argv[])
{
static unsigned int oldPulseID = 0;
char dataHeader[1024], globalHeader[1024];
pNEventArray data = NULL, tmp = NULL;
MD5_CTX md5Context;
unsigned char md5Hash[16];
struct timespec tPulse;
void *zmqContext = NULL;
void *pushSocket = NULL;
char sockAddress[255];
unsigned long nCount = 0, pulseCount = 0;
int64_t byteCount = 0;
time_t statTime;
int64_t rtimestamp[2];
unsigned int multiplier = 1;
int i, rc;
int hwm_value = 2;
if(argc < 3) {
printf("usage:\n\tzmqGenerator nexusfile portNo [multiplier]\n");
return 1;
}
/*
load NeXus file into en event list
*/
data = loadNeXus2Events(argv[1]);
if(data == NULL){
printf("Failed to load NeXus data to events\n");
}
/*
handle multiplier
*/
if(argc > 3){
multiplier = atoi(argv[3]);
tmp = multiplyNEventArray(data,multiplier);
if(tmp == NULL){
printf("Failed to multiply event array by %d\n", multiplier);
} else {
/* killNEventArray(&data); */
data = tmp;
}
}
printf("Sending %ld n-events per message\n", data->count);
/*
create dataHeader
*/
snprintf(dataHeader,sizeof(dataHeader),"{\"htype\":\"sinq-1.0\",\"pid\":925,\"st\":1469096950.708,\"ts\":1706054815,\"tr\":10000,\"ds\":[{\"ts\":32,\"bsy\":1,\"cnt\":1,\"rok\":1,\"gat\":1,\"evt\":4,\"id1\":12,\"id0\":12},%lu],\"hws\":{\"bsy\":0,\"cnt\":1,\"rok\":1,\"gat\":1,\"error\":0,\"full\":0,\"zmqerr\":0,\"lost\":[0,0,0,0,0,0,0,0,0,0]}}",data->count);
MD5Init(&md5Context);
MD5Update(&md5Context,dataHeader,strlen(dataHeader));
MD5Final(md5Hash,&md5Context);
/* printf("%s, hash =%x\n",dataHeader,md5Hash); */
/*
initialize 0MQ
*/
zmqContext = zmq_ctx_new();
// pushSocket = zmq_socket(zmqContext,ZMQ_PUB);
pushSocket = zmq_socket(zmqContext,ZMQ_PUSH);
snprintf(sockAddress,sizeof(sockAddress),"tcp://127.0.0.1:%s",argv[2]);
zmq_bind(pushSocket,sockAddress);
i = zmq_setsockopt(pushSocket,ZMQ_SNDHWM, &hwm_value, sizeof(hwm_value));
/*
start timer
*/
init_timer();
set_periodic_timer(71420); /* 71,42 milliseconds == 14HZ */
statTime = time(NULL);
while(1){
if(oldPulseID != pulseID){
if(pulseID - oldPulseID > 1) {
printf("Timer miss at pulseID %lu\n", pulseID);
}
oldPulseID = pulseID;
/* printf("Timer triggered, pulseID = %ld\n", pulseID); */
/*
create global header
*/
clock_gettime(CLOCK_MONOTONIC,&tPulse);
snprintf(globalHeader,sizeof(globalHeader),
"{\"global_timespamp\": {\"epoch\": %ld, \"ns\": %ld}, \"hash\": \"%p\",\"htype\": \"bsr_m-1.0\",\"pulse_id\": %ld}",
(long)tPulse.tv_sec,(long)tPulse.tv_nsec, md5Hash,pulseID
);
/*
create timestamp
*/
rtimestamp[0] = tPulse.tv_sec;
rtimestamp[1] = tPulse.tv_nsec;
/*
send the stuff away
*/
byteCount += zmq_send(pushSocket,globalHeader,strlen(globalHeader),ZMQ_SNDMORE);
byteCount += zmq_send(pushSocket,dataHeader,strlen(dataHeader),ZMQ_SNDMORE);
byteCount += zmq_send(pushSocket,data->detectorID,data->count*sizeof(int64_t),ZMQ_SNDMORE);
byteCount += zmq_send(pushSocket,rtimestamp,2*sizeof(int64_t), ZMQ_SNDMORE);
byteCount += zmq_send(pushSocket,data->timeStamp,data->count*sizeof(int32_t),ZMQ_SNDMORE);
byteCount += zmq_send(pushSocket,rtimestamp,2*sizeof(int64_t), 0);
/*
handle statistics
*/
nCount += data->count;
pulseCount++;
if(time(NULL) >= statTime + 10){
printf("byteCount = %ld, nCount = %ld, pulseCount = %ld\n", byteCount, nCount, pulseCount);
printf("Sent %f MB/sec , %f n* 10^6/sec, %ld pulses\n", byteCount/(1024.*1024.*10.), nCount/10000000., pulseCount);
pulseCount = 0;
byteCount = 0;
nCount = 0;
statTime = time(NULL);
}
}
}
killNEventArray(&data);
zmq_close(pushSocket);
zmq_ctx_destroy(zmqContext);
return 0;
}