Skip to content

Commit

Permalink
[feature]add ratelimiter using async_simple coroutine (#482)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenRi authored Nov 1, 2023
1 parent e4efeff commit 0e386d4
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 0 deletions.
175 changes: 175 additions & 0 deletions include/ylt/coro_io/rate_limiter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright (c) 2023, Alibaba Group Holding Limited;
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <async_simple/coro/Lazy.h>
#include <async_simple/coro/SpinLock.h>
#include <async_simple/coro/SyncAwait.h>

#include <algorithm>
#include <chrono>
#include <iostream>
#include <ylt/coro_io/coro_io.hpp>
#include <ylt/easylog.hpp>

namespace coro_io {
class rate_limiter {
public:
async_simple::coro::Lazy<std::chrono::milliseconds> acquire(int permits) {
std::chrono::milliseconds wait_mills;
{
auto scope = co_await this->lock_.coScopedLock();
wait_mills = reserve_and_get_wait_length(permits, current_time_mills());
}
co_await coro_io::sleep_for(wait_mills);
co_return wait_mills;
}
async_simple::coro::Lazy<void> set_rate(double permitsPerSecond) {
auto scope = co_await this->lock_.coScopedLock();
do_set_rate(permitsPerSecond, current_time_mills());
}

protected:
virtual void do_set_rate(
double permitsPerSecond,
std::chrono::steady_clock::time_point now_micros) = 0;
virtual std::chrono::steady_clock::time_point reserve_earliest_available(
int permits, std::chrono::steady_clock::time_point now_micros) = 0;
std::chrono::steady_clock::time_point current_time_mills() {
return std::chrono::steady_clock::now();
}

private:
std::chrono::milliseconds reserve_and_get_wait_length(
int permits, std::chrono::steady_clock::time_point now_micros) {
std::chrono::steady_clock::time_point moment_available =
reserve_earliest_available(permits, now_micros);
std::chrono::milliseconds diff_mills =
std::chrono::duration_cast<std::chrono::milliseconds>(moment_available -
now_micros);
return std::max(diff_mills, std::chrono::milliseconds(0));
}

async_simple::coro::SpinLock lock_;
};

class abstract_smooth_rate_limiter : public rate_limiter {
protected:
virtual void do_set_rate(double permits_per_second,
double stable_internal_micros) = 0;
virtual std::chrono::milliseconds stored_permits_to_wait_time(
double stored_permits, double permits_to_take) = 0;
virtual double cool_down_internal_micros() = 0;
void resync(std::chrono::steady_clock::time_point now_micros) {
// if next_free_ticket is in the past, resync to now
ELOG_DEBUG << "now micros: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
now_micros.time_since_epoch())
.count()
<< ", next_free_ticket_micros_: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
this->next_free_ticket_micros_.time_since_epoch())
.count();
if (now_micros > this->next_free_ticket_micros_) {
std::chrono::milliseconds diff_mills =
std::chrono::duration_cast<std::chrono::milliseconds>(
now_micros - this->next_free_ticket_micros_);
double newPermits = diff_mills.count() / cool_down_internal_micros();
this->stored_permits_ =
std::min(this->max_permits_, this->stored_permits_ + newPermits);
this->next_free_ticket_micros_ = now_micros;
}
}
void do_set_rate(double permits_per_second,
std::chrono::steady_clock::time_point now_micros) override {
resync(now_micros);
double stable_internal_micros = 1000 / permits_per_second;
this->stable_internal_micros_ = stable_internal_micros;
do_set_rate(permits_per_second, stable_internal_micros);
}
std::chrono::steady_clock::time_point reserve_earliest_available(
int required_permits, std::chrono::steady_clock::time_point now_micros) {
resync(now_micros);
std::chrono::steady_clock::time_point return_value =
this->next_free_ticket_micros_;
double stored_permits_to_spend =
std::min((double)required_permits, this->stored_permits_);
double fresh_permits = required_permits - stored_permits_to_spend;
std::chrono::milliseconds wait_micros =
stored_permits_to_wait_time(this->stored_permits_,
stored_permits_to_spend) +
std::chrono::milliseconds(
(int64_t)(fresh_permits * this->stable_internal_micros_));
this->next_free_ticket_micros_ += wait_micros;
this->stored_permits_ -= stored_permits_to_spend;
return return_value;
}

/**
* The currently stored permits.
*/
double stored_permits_ = 0;
/**
* The maximum number of stored permits.
*/
double max_permits_ = 0;
/**
* The interval between two unit requests, at our stable rate. E.g., a stable
* rate of 5 permits per second has a stable internal of 200ms.
*/
double stable_internal_micros_ = 0;
/**
* The time when the next request (no matter its size) will be granted. After
* granting a request, this is pushed further in the future. Large requests
* push this further than small requests.
*/
std::chrono::steady_clock::time_point next_free_ticket_micros_;
};

class smooth_bursty_rate_limiter : public abstract_smooth_rate_limiter {
public:
smooth_bursty_rate_limiter(double permits_per_second) {
this->max_burst_seconds_ = 1.0;
async_simple::coro::syncAwait(set_rate(permits_per_second));
}

protected:
void do_set_rate(double permits_per_second, double stable_internal_micros) {
double old_max_permits = this->max_permits_;
this->max_permits_ = this->max_burst_seconds_ * permits_per_second;
this->stored_permits_ =
(0 == old_max_permits)
? 0
: this->stored_permits_ * this->max_permits_ / old_max_permits;
ELOG_DEBUG << "max_permits_: " << this->max_permits_
<< ", stored_permits_:" << this->stored_permits_;
}

std::chrono::milliseconds stored_permits_to_wait_time(
double stored_permits, double permits_to_take) {
return std::chrono::milliseconds(0);
}

double cool_down_internal_micros() { return this->stable_internal_micros_; }

private:
/**
* The work(permits) of how many seconds can be saved up if the rate_limiter
* is unused.
*/
double max_burst_seconds_ = 0;
};
} // namespace coro_io
1 change: 1 addition & 0 deletions src/coro_io/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_executable(coro_io_test
test_corofile.cpp
test_channel.cpp
test_client_pool.cpp
test_rate_limiter.cpp
main.cpp
)
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_SYSTEM_NAME MATCHES "Windows") # mingw-w64
Expand Down
92 changes: 92 additions & 0 deletions src/coro_io/tests/test_rate_limiter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#include <async_simple/coro/Collect.h>
#include <doctest.h>

#include <ylt/coro_io/io_context_pool.hpp>
#include <ylt/coro_io/rate_limiter.hpp>

int64_t current_time_mills() {
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
std::chrono::milliseconds ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch());
return ms.count();
}

TEST_CASE("test smooth_bursty_rate_limiter simple") {
coro_io::smooth_bursty_rate_limiter rate_limiter(1);
std::chrono::milliseconds wait_time =
async_simple::coro::syncAwait(rate_limiter.acquire(1));
CHECK_EQ(0, wait_time.count());
}

TEST_CASE("test smooth_bursty_rate_limiter acquire multi permits") {
double permits_per_second = 10.0;
int permits_to_acquire = 5;
double expected_cost = (permits_to_acquire) * (1 / permits_per_second);
double cost_diff = 0.1;

coro_io::smooth_bursty_rate_limiter rate_limiter(permits_per_second);

// acquire much enough, next acquire will wait
async_simple::coro::syncAwait(rate_limiter.acquire(permits_to_acquire));

int64_t start_mills = current_time_mills();
async_simple::coro::syncAwait(rate_limiter.acquire(1));
double cost = (current_time_mills() - start_mills) / 1000.0;
CHECK(cost > expected_cost - cost_diff);
CHECK(cost < expected_cost + cost_diff);
}

TEST_CASE("test smooth_bursty_rate_limiter single thread") {
double permits_per_second = 10.0;
int permits_to_acquire = 5;
double expected_cost = (permits_to_acquire - 1) * (1 / permits_per_second);
double cost_diff = 0.1;

int64_t start_mills = current_time_mills();
coro_io::smooth_bursty_rate_limiter rate_limiter(permits_per_second);
for (int i = 0; i < permits_to_acquire; i++) {
std::chrono::milliseconds wait_mills =
async_simple::coro::syncAwait(rate_limiter.acquire(1));
ELOG_INFO << "wait for " << wait_mills.count();
}
double cost = (current_time_mills() - start_mills) / 1000.0;

CHECK(cost > expected_cost - cost_diff);
CHECK(cost < expected_cost + cost_diff);
}

TEST_CASE("test smooth_bursty_rate_limiter multi coroutine") {
double permits_per_second = 100.0;
int num_of_coroutine = 5;
int permits_to_acquire_every_coroutine = 5;
double expected_cost =
(num_of_coroutine * permits_to_acquire_every_coroutine - 1) *
(1 / permits_per_second);
double cost_diff = 0.1;

coro_io::smooth_bursty_rate_limiter rate_limiter(permits_per_second);
auto consumer = [&](int coroutine_num) -> async_simple::coro::Lazy<void> {
for (int i = 0; i < permits_to_acquire_every_coroutine; i++) {
co_await rate_limiter.acquire(1);
ELOG_INFO << "coroutine " << coroutine_num << " acquired";
}
};

auto consumer_list_lazy = [&]() -> async_simple::coro::Lazy<void> {
std::vector<async_simple::coro::Lazy<void>> lazy_list;
for (int i = 0; i < num_of_coroutine; i++) {
lazy_list.push_back(consumer(i));
}
co_await collectAllPara(std::move(lazy_list));
};

int64_t start_mills = current_time_mills();
syncAwait(consumer_list_lazy().via(
coro_io::g_block_io_context_pool<coro_io::multithread_context_pool>(1)
.get_executor()));
double cost = (current_time_mills() - start_mills) / 1000.0;

CHECK(cost > expected_cost - cost_diff);
CHECK(cost < expected_cost + cost_diff);
}

0 comments on commit 0e386d4

Please sign in to comment.