forked from coderdj/redax
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.cc
288 lines (249 loc) · 8.97 KB
/
main.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
#include <iostream>
#include <string>
#include <iomanip>
#include <csignal>
#include "V1724.hh"
#include "DAQController.hh"
bool b_run = true;
void SignalHandler(int signum) {
std::cout << "Received signal "<<signum<<std::endl;
b_run = false;
return;
}
int main(int argc, char** argv){
// Need to create a mongocxx instance and it must exist for
// the entirety of the program. So here seems good.
mongocxx::instance instance{};
signal(SIGINT, SignalHandler);
signal(SIGTERM, SignalHandler);
std::string current_run_id="none";
// Accept at least 2 arguments
if(argc<3){
std::cout<<"Welcome to DAX. Run with a unique ID and a valid mongodb URI"<<std::endl;
std::cout<<"e.g. ./dax ID mongodb://user:pass@host:port/authDB"<<std::endl;
std::cout<<"...exiting"<<std::endl;
exit(0);
}
string dbname = "xenonnt";
if(argc >= 4)
dbname = argv[3];
// We will consider commands addressed to this PC's ID
char chostname[HOST_NAME_MAX];
gethostname(chostname, HOST_NAME_MAX);
std::string hostname=chostname;
hostname+= "_reader_";
string sid = argv[1];
hostname += sid;
std::cout<<"Reader starting with ID: "<<hostname<<std::endl;
// MongoDB Connectivity for control database. Bonus for later:
// exception wrap the URI parsing and client connection steps
string suri = argv[2];
mongocxx::uri uri(suri.c_str());
mongocxx::client client(uri);
mongocxx::database db = client[dbname];
mongocxx::collection control = db["control"];
mongocxx::collection status = db["status"];
mongocxx::collection options_collection = db["options"];
// Logging
MongoLog *logger = new MongoLog();
int ret = logger->Initialize(suri, dbname, "log", hostname,
"dac_values", true);
if(ret!=0){
std::cout<<"Exiting"<<std::endl;
exit(-1);
}
//Options
Options *fOptions = NULL;
// The DAQController object is responsible for passing commands to the
// boards and tracking the status
DAQController *controller = new DAQController(logger, hostname);
std::vector<std::thread*> readoutThreads;
// Main program loop. Scan the database and look for commands addressed
// to this hostname.
while(b_run){
// Try to poll for commands
bsoncxx::stdx::optional<bsoncxx::document::value> querydoc;
try{
// mongocxx::cursor cursor = control.find
querydoc = control.find_one
(
bsoncxx::builder::stream::document{} << "host" << hostname << "acknowledged" <<
bsoncxx::builder::stream::open_document << "$ne" << hostname <<
bsoncxx::builder::stream::close_document <<
bsoncxx::builder::stream::finalize
);
}catch(const std::exception &e){
std::cout<<e.what()<<std::endl;
std::cout<<"Can't connect to DB so will continue what I'm doing"<<std::endl;
}
//for(auto doc : cursor) {
if(querydoc){
auto doc = querydoc->view();
std::cout<<"Found a doc with command "<<
doc["command"].get_utf8().value.to_string()<<std::endl;
// Very first thing: acknowledge we've seen the command. If the command
// fails then we still acknowledge it because we tried
control.update_one
(
bsoncxx::builder::stream::document{} << "_id" << (doc)["_id"].get_oid() <<
bsoncxx::builder::stream::finalize,
bsoncxx::builder::stream::document{} << "$push" <<
bsoncxx::builder::stream::open_document << "acknowledged" << hostname <<
bsoncxx::builder::stream::close_document <<
bsoncxx::builder::stream::finalize
);
std::cout<<"Updated doc"<<std::endl;
// Get the command out of the doc
string command = "";
try{
command = (doc)["command"].get_utf8().value.to_string();
}
catch (const std::exception &e){
//LOG
std::stringstream err;
err<<"Received malformed command: "<< bsoncxx::to_json(doc);
logger->Entry(err.str(), MongoLog::Warning);
}
// Process commands
if(command == "start"){
if(controller->status() == 2) {
if(controller->Start()!=0){
continue;
}
// Nested tried cause of nice C++ typing
try{
current_run_id = (doc)["run_identifier"].get_utf8().value.to_string();
}
catch(const std::exception &e){
try{
current_run_id = std::to_string((doc)["run_identifier"].get_int32());
}
catch(const std::exception &e){
current_run_id = "na";
}
}
logger->Entry("Received start command from user "+
(doc)["user"].get_utf8().value.to_string(), MongoLog::Message);
}
else
logger->Entry("Cannot start DAQ since not in ARMED state", MongoLog::Debug);
}
else if(command == "stop"){
// "stop" is also a general reset command and can be called any time
logger->Entry("Received stop command from user "+
(doc)["user"].get_utf8().value.to_string(), MongoLog::Message);
if(controller->Stop()!=0)
logger->Entry("DAQ failed to stop. Will continue clearing program memory.", MongoLog::Error);
current_run_id = "none";
if(readoutThreads.size()!=0){
for(auto t : readoutThreads){
t->join();
delete t;
}
readoutThreads.clear();
}
controller->End();
}
else if(command == "arm"){
// Can only arm if we're in the idle, arming, or armed state
if(controller->status() == 0 || controller->status() == 1 || controller->status() == 2){
// Join readout threads if they still are out there
controller->Stop();
if(readoutThreads.size() !=0){
for(auto t : readoutThreads){
std::cout<<"Joining orphaned readout thread"<<std::endl;
t->join();
delete t;
}
readoutThreads.clear();
}
// Clear up any previously failed things
if(controller->status() != 0)
controller->End();
// Get an override doc from the 'options_override' field if it exists
std::string override_json = "";
try{
bsoncxx::document::view oopts = (doc)["options_override"].get_document().view();
override_json = bsoncxx::to_json(oopts);
}
catch(const std::exception &e){
logger->Entry("No override options provided, continue without.", MongoLog::Debug);
}
bool initialized = false;
// Mongocxx types confusing so passing json strings around
if(fOptions != NULL)
delete fOptions;
fOptions = new Options(logger, (doc)["mode"].get_utf8().value.to_string(),
options_collection, override_json);
std::vector<int> links;
std::map<int, std::vector<u_int16_t>> written_dacs;
if(controller->InitializeElectronics(fOptions, links, written_dacs) != 0){
logger->Entry("Failed to initialize electronics", MongoLog::Error);
controller->End();
}
else{
logger->UpdateDACDatabase(fOptions->GetString("run_identifier", "default"),
written_dacs);
initialized = true;
logger->Entry("Initialized electronics", MongoLog::Debug);
}
if(readoutThreads.size()!=0){
logger->Entry("Cannot start DAQ while readout thread from previous run active. Please perform a reset", MongoLog::Message);
}
else if(!initialized){
cout<<"Skipping readout configuration since init failed"<<std::endl;
}
else{
for(unsigned int i=0; i<links.size(); i++){
std::cout<<"Starting readout thread for link "<<links[i]<<std::endl;
std:: thread *readoutThread = new std::thread
(
DAQController::ReadThreadWrapper,
(static_cast<void*>(controller)), links[i]
);
readoutThreads.push_back(readoutThread);
}
}
}
else
logger->Entry("Cannot arm DAQ while not 'Idle'", MongoLog::Warning);
}
}
// Insert some information on this readout node back to the monitor DB
controller->CheckErrors();
try{
// Gonna have to separate this
// Need function controller->GetDataPerDigi() that returns map by value and clears prv member
// need to put that map into BSON.
auto insert_doc = bsoncxx::builder::stream::document{};
insert_doc << "host" << hostname <<
"rate" << controller->GetDataSize()/1e6 <<
"status" << controller->status() <<
"buffer_length" << controller->buffer_length()/1e6 <<
"run_mode" << controller->run_mode() <<
"current_run_id" << current_run_id <<
"boards" << bsoncxx::builder::stream::open_document <<
[&](bsoncxx::builder::stream::key_context<> doc){
for( auto const& kPair : controller->GetDataPerDigi() )
doc << std::to_string(kPair.first) << kPair.second/1e6;
} << bsoncxx::builder::stream::close_document;
//auto final_doc = insert_doc << bsoncxx::builder::stream::finalize;
status.insert_one(insert_doc << bsoncxx::builder::stream::finalize);
/*status.insert_one(bsoncxx::builder::stream::document{} <<
"host" << hostname <<
"rate" << controller->GetDataSize()/1e6 <<
"status" << controller->status() <<
"buffer_length" << controller->buffer_length()/1e6 <<
"run_mode" << controller->run_mode() <<
"current_run_id" << current_run_id <<
bsoncxx::builder::stream::finalize);
*/
}catch(const std::exception &e){
std::cout<<"Can't connect to DB to update."<<std::endl;
std::cout<<e.what()<<std::endl;
}
usleep(1000000);
}
delete logger;
exit(0);
}