forked from antirez/dump1090
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrest.c
154 lines (131 loc) · 3.88 KB
/
rest.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/*
* rest.c
*
* Created on: Apr 18, 2015
* Author: borax
*/
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <json-c/json.h>
#include <curl/curl.h>
#include "rest.h"
#define TIMEOUT 10000L
struct {
// Threading
pthread_t sender_thread;
pthread_mutex_t thread_lock;
// Internal message list
struct queue_message *first_message;
struct queue_message *last_message;
char* uri;
CURL *curl;
} Rest;
struct queue_message {
json_object *message;
struct queue_message *next;
};
struct queue_message *popFirstMessageInQueue();
void sendMessagesToRestInterface();
void sendMessage(json_object *json);
/* Initialize things */
int initRestConnection(char* uri) {
printf("Rest connection settings: %s\n", uri);
Rest.uri = uri;
Rest.first_message = NULL;
pthread_mutex_init(&Rest.thread_lock,NULL);
pthread_create(&Rest.sender_thread, NULL, sendMessagesToRestInterface, NULL);
Rest.curl = curl_easy_init();
if(Rest.curl) {
struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "Accept: application/json");
headers = curl_slist_append(headers, "Content-Type: application/json");
curl_easy_setopt(Rest.curl, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(Rest.curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(Rest.curl, CURLOPT_URL, uri);
// curl_easy_setopt(Rest.curl, CURLOPT_VERBOSE, 1L);
}
return 0;
}
/* Queue functionality */
void addRawMessageToRestQueue(char *data, int length) {
if(!Rest.uri) {
return;
}
struct queue_message *curr = malloc(sizeof(struct queue_message));
struct timeval tv;
gettimeofday(&tv, NULL);
unsigned long long millisecondsSinceEpoch =
(unsigned long long)(tv.tv_sec) * 1000 +
(unsigned long long)(tv.tv_usec) / 1000;
data[length-1] = '\0';
json_object *json = json_object_new_object();
json_object *adsb_data = json_object_new_string(data);
json_object *timestamp = json_object_new_int64(millisecondsSinceEpoch);
json_object_object_add(json, "message", adsb_data);
json_object_object_add(json, "timestamp", timestamp);
curr->message = json;
addMessageToQueue(curr);
}
void addMessageToQueue(struct queue_message *msg) {
pthread_mutex_lock(&Rest.thread_lock);
struct queue_message *tail = Rest.last_message;
if(tail) {
tail->next = msg;
} else {
Rest.first_message = msg;
}
Rest.last_message = msg;
pthread_mutex_unlock(&Rest.thread_lock);
}
struct queue_message *popFirstMessageInQueue() {
pthread_mutex_lock(&Rest.thread_lock);
struct queue_message *msg = Rest.first_message;
if(Rest.first_message == Rest.last_message) {
Rest.first_message = 0;
Rest.last_message = 0;
} else {
Rest.first_message = Rest.first_message->next;
}
pthread_mutex_unlock(&Rest.thread_lock);
return msg;
}
/* REST */
void sendMessagesToRestInterface() {
json_object *json = json_object_new_object();
int message_counter = 1;
int wait_counter = 0;
while(1) {
// Send batch in case we either have 100 messages in queue or we have waited 5s and have messages to send
if(message_counter > 100 || (message_counter > 1 && wait_counter > 100)) {
sendMessage(json);
message_counter = 1;
wait_counter = 0;
json_object_put(json);
json = json_object_new_object();
}
else if (Rest.first_message) {
struct queue_message *msg = popFirstMessageInQueue();
if(msg) {
char buf[9];
sprintf(buf, "entry-%d", message_counter);
json_object_object_add(json, buf, msg->message);
message_counter++;
free(msg);
continue;
}
} else {
// Sleep for 50ms
usleep(50000);
wait_counter++;
}
}
}
void sendMessage(json_object *json) {
curl_easy_setopt(Rest.curl, CURLOPT_POSTFIELDS, json_object_to_json_string(json));
curl_easy_perform(Rest.curl);
// printf("\nSending message %s\n", json_object_to_json_string(json));
}