forked from blechschmidt/massdns
-
-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathtimed_ring.h
153 lines (135 loc) · 4.86 KB
/
timed_ring.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#ifndef MASSRESOLVER_TIMED_RING_H
#define MASSRESOLVER_TIMED_RING_H
#include <time.h>
#include <stdint.h>
#include "hashmap.h"
#include "security.h"
#include "list.h"
// The timed ring is a circular buffer allowing to efficiently process time-based events with a certain precision.
#define TIMED_RING_S 1000000000
#define TIMED_RING_MS 1000000
#define TIMED_RING_US 1000
#define TIMED_RING_NS 1
typedef struct
{
void **data;
size_t count;
single_list_t following; // entries exceeding the bucket capacity
} timed_ring_bucket_t;
typedef struct {
size_t bucket_count; // number of buckets
size_t precision; // number of nanoseconds per bucket
struct timespec last_time;
size_t bucket_capacity;
size_t next_bucket; // The index of the bucket that is supposed to be executed next.
timed_ring_bucket_t *buckets;
bool efficient;
} timed_ring_t;
// Initialize a timed ring with a time span of bucket_count * precision
void timed_ring_init(timed_ring_t* ring, size_t bucket_count, size_t precision, size_t bucket_capacity)
{
assert(bucket_capacity != 0);
assert(1000000000 % precision == 0);
ring->bucket_count = bucket_count;
ring->precision = precision;
ring->bucket_capacity = bucket_capacity;
ring->buckets = safe_malloc(sizeof(*ring->buckets) * ring->bucket_count);
ring->next_bucket = 0;
ring->efficient = false;
for(size_t i = 0; i < ring->bucket_count; i++)
{
ring->buckets[i].count = 0;
single_list_init(&ring->buckets[i].following);
ring->buckets[i].data = safe_malloc(sizeof(void*) * bucket_capacity);
}
clock_gettime(CLOCK_MONOTONIC, &ring->last_time);
ring->last_time.tv_nsec = (ring->last_time.tv_nsec / precision) * precision;
}
void timed_ring_clear(timed_ring_t* ring)
{
for(size_t i = 0; i < ring->bucket_count; i++)
{
ring->buckets[i].count = 0;
single_list_clear(&ring->buckets[i].following);
}
}
void timed_ring_destroy(timed_ring_t* ring)
{
if(!ring || !ring->buckets)
{
return;
}
for(size_t i = 0; i < ring->bucket_count; i++)
{
single_list_clear(&ring->buckets[i].following);
free(ring->buckets[i].data);
}
free(ring->buckets);
}
void timed_ring_remove(timed_ring_t *ring, void **add_ptr)
{
*add_ptr = NULL;
}
void **timed_ring_add(timed_ring_t *ring, time_t in, void *ptr)
{
struct timespec expiry;
clock_gettime(CLOCK_MONOTONIC, &expiry);
expiry.tv_nsec += in;
expiry.tv_sec += expiry.tv_nsec / 1000000000;
expiry.tv_nsec %= 1000000000;
expiry.tv_nsec = (expiry.tv_nsec / ring->precision) * ring->precision;
time_t elapsed_ns = (expiry.tv_sec - ring->last_time.tv_sec) * 1000000000 + (expiry.tv_nsec - ring->last_time.tv_nsec);
size_t elapsed_buckets = elapsed_ns / ring->precision;
elapsed_buckets = min(ring->bucket_count - 1, elapsed_buckets);
size_t bucket = (ring->next_bucket + elapsed_buckets) % ring->bucket_count;
if(ring->buckets[bucket].count < ring->bucket_capacity)
{
ring->buckets[bucket].data[ring->buckets[bucket].count] = ptr;
return &ring->buckets[bucket].data[ring->buckets[bucket].count++];
}
else
{
ring->buckets[bucket].count++;
return &single_list_push_back(&ring->buckets[bucket].following, ptr)->data;
}
}
static inline void timed_ring_handle_helper(timed_ring_t *ring, timed_ring_bucket_t *bucket, void (*callback)(void*))
{
// Copy everything because the callback function might add another entry to the same bucket
// This is an important limitation that needs to be considered:
// Re-adding more than one event to the ring per callback call might result in an endless loop.
single_list_t iter_list = bucket->following;
single_list_init(&bucket->following);
size_t count = min(bucket->count, ring->bucket_capacity);
bucket->count = 0;
for(size_t j = 0; j < count; j++)
{
if(bucket->data[j] != NULL)
{
callback(bucket->data[j]);
}
}
single_list_foreach_free(iter_list, elm)
{
if(elm->data != NULL)
{
callback(elm->data);
}
}
}
void timed_ring_handle(timed_ring_t *ring, void (*callback)(void*))
{
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
now.tv_nsec = (now.tv_nsec / ring->precision) * ring->precision;
time_t elapsed_ns = (now.tv_sec - ring->last_time.tv_sec) * 1000000000 + (now.tv_nsec - ring->last_time.tv_nsec);
size_t elapsed_buckets = elapsed_ns / ring->precision;
elapsed_buckets = min(ring->bucket_count, elapsed_buckets);
for(size_t i = 0; i < elapsed_buckets; i++)
{
timed_ring_handle_helper(ring, &ring->buckets[(ring->next_bucket + i) % ring->bucket_count], callback);
}
ring->next_bucket = (ring->next_bucket + elapsed_buckets) % ring->bucket_count;
ring->last_time = now;
}
#endif //MASSRESOLVER_TIMED_RING_H