forked from coderdj/redax
-
Notifications
You must be signed in to change notification settings - Fork 7
/
StraxFormatter.hh
117 lines (103 loc) · 3.53 KB
/
StraxFormatter.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#ifndef _STRAXINSERTER_HH_
#define _STRAXINSERTER_HH_
#include <cstdlib>
#include <cstdint>
#include <string>
#include <map>
#include <mutex>
#include <experimental/filesystem>
#include <numeric>
#include <atomic>
#include <vector>
#include <thread>
#include <condition_variable>
#include <list>
#include <memory>
#include <string_view>
#include <chrono>
#include <functional>
class Options;
class MongoLog;
class V1724;
struct data_packet{
data_packet() : clock_counter(0), header_time(0) {}
data_packet(std::u32string s, uint32_t ht, long cc) :
buff(std::move(s)), clock_counter(cc), header_time(ht) {}
data_packet(const data_packet& rhs)=delete;
data_packet(data_packet&& rhs) : buff(std::move(rhs.buff)),
clock_counter(rhs.clock_counter), header_time(rhs.header_time), digi(rhs.digi) {}
~data_packet() {buff.clear(); digi.reset();}
data_packet& operator=(const data_packet& rhs)=delete;
data_packet& operator=(data_packet&& rhs) {
buff=std::move(rhs.buff);
clock_counter=rhs.clock_counter;
header_time=rhs.header_time;
digi=rhs.digi;
return *this;
}
std::u32string buff;
long clock_counter;
uint32_t header_time;
std::shared_ptr<V1724> digi;
};
class StraxFormatter{
/*
Reformats raw data into strax format
*/
public:
StraxFormatter(std::shared_ptr<Options>&, std::shared_ptr<MongoLog>&);
~StraxFormatter();
void Close(std::map<int,int>& ret);
void Process();
std::pair<int, int> GetBufferSize() {return {fInputBufferSize.load(), fOutputBufferSize.load()};}
void GetDataPerChan(std::map<int, int>& ret);
int ReceiveDatapackets(std::list<std::unique_ptr<data_packet>>&, int);
private:
void ProcessDatapacket(std::unique_ptr<data_packet> dp);
int ProcessEvent(std::u32string_view, const std::unique_ptr<data_packet>&,
std::map<int, int>&);
int ProcessChannel(std::u32string_view, int, int, uint32_t, int&, int,
const std::unique_ptr<data_packet>&, std::map<int, int>&);
void WriteOutChunk(int);
void WriteOutChunks();
void End();
void GenerateArtificialDeadtime(int64_t, const std::shared_ptr<V1724>&);
void AddFragmentToBuffer(std::string, uint32_t, int);
std::vector<std::string> GetChunkNames(int);
std::experimental::filesystem::path GetFilePath(const std::string&, bool=false);
std::experimental::filesystem::path GetDirectoryPath(const std::string&, bool=false);
std::string GetStringFormat(int id);
void CreateEmpty(int);
int fEmptyVerified;
std::function<long(std::shared_ptr<std::string>&, std::shared_ptr<std::string>&, long&)> fCompressor;
int64_t fChunkLength; // ns
int64_t fChunkOverlap; // ns
int fFragmentBytes;
int fStraxHeaderSize; // bytes
int fFullFragmentSize;
int fBufferNumChunks;
int fWarnIfChunkOlderThan;
unsigned fChunkNameLength;
int64_t fFullChunkLength;
std::string fOutputPath, fHostname, fFullHostname;
std::shared_ptr<Options> fOptions;
std::shared_ptr<MongoLog> fLog;
std::atomic_bool fActive;
std::map<int, std::list<std::string>> fChunks, fOverlaps;
std::map<int, int> fFailCounter;
std::map<int, int> fDataPerChan;
std::mutex fDPC_mutex;
std::map<int, long> fBufferCounter;
std::map<int, long> fFragsPerEvent;
std::map<int, long> fEvPerDP;
std::map<int, long> fBytesPerChunk;
std::atomic_int fInputBufferSize, fOutputBufferSize;
long fBytesProcessed;
double fProcTimeDP, fProcTimeEv, fProcTimeCh, fCompTime;
std::thread::id fThreadId;
std::condition_variable fCV;
std::mutex fBufferMutex;
std::list<std::unique_ptr<data_packet>> fBuffer;
std::vector<int> fMutexWaitTime;
};
#endif