Skip to content

Commit

Permalink
Implemented a resource bypass for localhost queries.
Browse files Browse the repository at this point in the history
  • Loading branch information
drolbr committed Nov 13, 2024
1 parent 91b4be6 commit ee37f50
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 20 deletions.
17 changes: 11 additions & 6 deletions src/overpass_api/dispatch/dispatcher_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ int main(int argc, char* argv[])
else if (!(strncmp(argv[argpos], "--allow-duplicate-queries=", 26)))
bit_limits = ((bit_limits & 0xfffffffc) | 0x2 |
(((std::string)argv[argpos]).substr(26) == "yes" ? 0x1 : 0));
else if (!(strncmp(argv[argpos], "--limit-client-zero=", 20)))
bit_limits = ((bit_limits & 0xfffffff3) | 0x8 |
(((std::string)argv[argpos]).substr(26) == "yes" ? 0 : 0x1));
else if (!(strncmp(argv[argpos], "--server-name=", 14)))
server_name = ((std::string)argv[argpos]).substr(14);
else
Expand Down Expand Up @@ -469,15 +472,15 @@ int main(int argc, char* argv[])
if (attic)
mode_.set_mode(Database_Meta_State::keep_attic);
Database_Meta_State::Mode mode = mode_.value_or_autodetect(db_dir);

files_to_manage = osm_base_settings().bin_idxs();

if (mode >= Database_Meta_State::keep_meta)
files_to_manage.insert(
files_to_manage.end(), meta_settings().bin_idxs().begin(), meta_settings().bin_idxs().end());
files_to_manage.end(), meta_settings().bin_idxs().begin(), meta_settings().bin_idxs().end());
else
suspicious_files_present |= assure_files_absent(db_dir, meta_settings().bin_idxs(), "--meta");

if (mode >= Database_Meta_State::keep_attic)
files_to_manage.insert(
files_to_manage.end(), attic_settings().bin_idxs().begin(), attic_settings().bin_idxs().end());
Expand Down Expand Up @@ -528,7 +531,7 @@ int main(int argc, char* argv[])
std::cerr<<"getrlimit(RLIMIT_NOFILE, ..) failed: "<<errno<<' '<<strerror(errno)<<'\n';
return errno;
}

Logger logger(db_dir);
Default_Dispatcher_Logger disp_logger(logger);
if (max_allowed_space <= 0)
Expand All @@ -549,7 +552,9 @@ int main(int argc, char* argv[])
dispatcher.set_rate_limit(rate_limit);
if (bit_limits & 0x2)
dispatcher.set_allow_duplicate_queries(bit_limits & 0x1);

if (bit_limits & 0x8)
dispatcher.set_exempt_client_zero(bit_limits & 0x4);

if (!server_name.empty())
{
try
Expand All @@ -561,7 +566,7 @@ int main(int argc, char* argv[])
std::cout<<"exception: "<<e.what()<<'\n';
}
}

dispatcher.standby_loop(0);
}
catch (File_Error e)
Expand Down
27 changes: 15 additions & 12 deletions src/template_db/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,11 @@ int Global_Resource_Planner::probe(pid_t pid, uint32 client_token, uint32 time_u
}

// Simple checks: is the query acceptable from a global point of view?
if (global_available_time < global_used_time ||
if ((client_token > 0 || !exempt_client_zero) &&
(global_available_time < global_used_time ||
time_units > (global_available_time - global_used_time)/2 ||
global_available_space < global_used_space ||
max_space > (global_available_space - global_used_space)/2)
max_space > (global_available_space - global_used_space)/2))
{
if (!handle || cur_time - handle->first_seen < 15)
return 0;
Expand Down Expand Up @@ -385,7 +386,7 @@ struct Running_Requests_Hashtable
sum += i.capacity();
return sum;
}

bool probe(const Hash_of_Running_Request& arg);

private:
Expand All @@ -409,7 +410,7 @@ bool Running_Requests_Hashtable::probe(const Hash_of_Running_Request& arg)
return true;
}
}

if (bucket.size() + 4 < 2*num_expired)
{
for (decltype(bucket.size()) i = 0; i < bucket.size(); )
Expand All @@ -419,7 +420,7 @@ bool Running_Requests_Hashtable::probe(const Hash_of_Running_Request& arg)
bucket[i] = bucket.back();
bucket.pop_back();
}
else
else
++i;
}
}
Expand All @@ -437,7 +438,7 @@ bool Running_Requests_Hashtable::probe(const Hash_of_Running_Request& arg)
}
else
bucket.push_back(arg);

return true;
}

Expand All @@ -463,7 +464,7 @@ Dispatcher::Dispatcher(
requests_load_rejected(0),
requests_rate_limited(0),
requests_as_duplicate_rejected(0),
global_resource_planner(total_available_time_units_, total_available_space_, 0, false)
global_resource_planner(total_available_time_units_, total_available_space_, 0, false, true)
{
signal(SIGPIPE, SIG_IGN);
signal(SIGTERM, sigterm);
Expand Down Expand Up @@ -747,7 +748,7 @@ void Dispatcher::standby_loop(uint64 milliseconds)
uint32 counter = 0;
uint32 idle_counter = 0;
Running_Requests_Hashtable hashtable_full_request;

while ((milliseconds == 0) || (counter < milliseconds/100))
{
if (sigterm_status() == Signal_Status::received)
Expand Down Expand Up @@ -776,7 +777,7 @@ void Dispatcher::standby_loop(uint64 milliseconds)
logger->idle_counter(idle_counter);
idle_counter = 0;
}

try
{
if (command == TERMINATE)
Expand Down Expand Up @@ -858,7 +859,7 @@ void Dispatcher::standby_loop(uint64 milliseconds)
uint64 max_allowed_space = (((uint64)arguments[2])<<32 | arguments[1]);
uint32 client_token = arguments[3];
uint64 request_full_hash = (((uint64)arguments[5])<<32 | arguments[4]);

if (global_resource_planner.get_allow_duplicate_queries() ||
hashtable_full_request.probe({ request_full_hash, time(0) + max_allowed_time, client_pid }))
{
Expand Down Expand Up @@ -984,6 +985,8 @@ void Dispatcher::standby_loop(uint64 milliseconds)
global_resource_planner.set_rate_limit(rate_limit_);
if (bit_limits & 0x2)
global_resource_planner.set_allow_duplicate_queries(bit_limits & 0x1);
if (bit_limits & 0x8)
global_resource_planner.set_exempt_client_zero(bit_limits & 0x4);

connection_per_pid.get(client_pid)->send_result(command);
}
Expand Down Expand Up @@ -1019,7 +1022,7 @@ void Dispatcher::standby_loop(uint64 milliseconds)
uint32 command = 0;
uint32 client_pid = 0;
connection_per_pid.poll_command_round_robin(command, client_pid);

try
{
if (command == WRITE_ROLLBACK)
Expand Down Expand Up @@ -1047,7 +1050,7 @@ void Dispatcher::standby_loop(uint64 milliseconds)

--terminate_countdown;
}

if (logger && (milliseconds == 0 || counter < milliseconds/100))
logger->terminate_triggered(terminate_countdown, writing_process);
}
Expand Down
8 changes: 7 additions & 1 deletion src/template_db/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ class Global_Resource_Planner
public:
Global_Resource_Planner(
uint32 global_available_time_, uint64 global_available_space_,
uint32 rate_limit_, bool allow_duplicate_queries_)
uint32 rate_limit_, bool allow_duplicate_queries_, bool exempt_client_zero_)
: global_used_time(0), global_available_time(global_available_time_),
global_used_space(0), global_available_space(global_available_space_),
rate_limit(rate_limit_), allow_duplicate_queries(allow_duplicate_queries_),
exempt_client_zero(exempt_client_zero_),
recent_average_used_time(15), recent_average_used_space(15),
last_update_time(0), last_used_time(0), last_used_space(0), last_counted(0),
average_used_time(0), average_used_space(0) {}
Expand All @@ -117,6 +118,7 @@ class Global_Resource_Planner
void set_total_available_space(uint64 global_available_space_) { global_available_space = global_available_space_; }
void set_rate_limit(uint rate_limit_) { rate_limit = rate_limit_; }
void set_allow_duplicate_queries(bool allow_duplicate_queries_) { allow_duplicate_queries = allow_duplicate_queries_; }
void set_exempt_client_zero(bool exempt_client_zero_) { exempt_client_zero = exempt_client_zero_; }

const std::vector< Reader_Entry >& get_active() const { return active; }
bool is_active(pid_t client_pid) const;
Expand All @@ -140,6 +142,7 @@ class Global_Resource_Planner
uint64 global_available_space;
uint32 rate_limit;
bool allow_duplicate_queries;
bool exempt_client_zero;

std::vector< uint32 > recent_average_used_time;
std::vector< uint64 > recent_average_used_space;
Expand Down Expand Up @@ -295,6 +298,9 @@ class Dispatcher
void set_allow_duplicate_queries(bool allow_duplicate_queries)
{ global_resource_planner.set_allow_duplicate_queries(allow_duplicate_queries); }

void set_exempt_client_zero(bool exempt_client_zero)
{ global_resource_planner.set_exempt_client_zero(exempt_client_zero); }

private:
Dispatcher_Socket socket;
Connection_Per_Pid_Map connection_per_pid;
Expand Down
2 changes: 1 addition & 1 deletion src/template_db/dispatcher_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ void Dispatcher_Client::request_read_and_idx(
}
if (ack == Dispatcher::RATE_LIMITED)
throw File_Error(0, dispatcher_share_name, "Dispatcher_Client::request_read_and_idx::rate_limited");
else if (ack == Dispatcher::QUERY_REJECTED)
else if (ack == Dispatcher::QUERY_REJECTED || ack == 0)
throw File_Error(0, dispatcher_share_name, "Dispatcher_Client::request_read_and_idx::timeout");
else if (ack == Dispatcher::DUPLICATE_QUERY)
throw File_Error(0, dispatcher_share_name, "Dispatcher_Client::request_read_and_idx::duplicate_query");
Expand Down

0 comments on commit ee37f50

Please sign in to comment.