From 0e386d45560ab2e4e10f35887501dfd2c6fa374e Mon Sep 17 00:00:00 2001 From: Shuaibing Zhao Date: Wed, 1 Nov 2023 11:13:06 +0800 Subject: [PATCH] [feature]add ratelimiter using async_simple coroutine (#482) --- include/ylt/coro_io/rate_limiter.hpp | 175 ++++++++++++++++++++++++ src/coro_io/tests/CMakeLists.txt | 1 + src/coro_io/tests/test_rate_limiter.cpp | 92 +++++++++++++ 3 files changed, 268 insertions(+) create mode 100644 include/ylt/coro_io/rate_limiter.hpp create mode 100644 src/coro_io/tests/test_rate_limiter.cpp diff --git a/include/ylt/coro_io/rate_limiter.hpp b/include/ylt/coro_io/rate_limiter.hpp new file mode 100644 index 000000000..b9b96fd1e --- /dev/null +++ b/include/ylt/coro_io/rate_limiter.hpp @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2023, Alibaba Group Holding Limited; + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace coro_io { +class rate_limiter { + public: + async_simple::coro::Lazy acquire(int permits) { + std::chrono::milliseconds wait_mills; + { + auto scope = co_await this->lock_.coScopedLock(); + wait_mills = reserve_and_get_wait_length(permits, current_time_mills()); + } + co_await coro_io::sleep_for(wait_mills); + co_return wait_mills; + } + async_simple::coro::Lazy set_rate(double permitsPerSecond) { + auto scope = co_await this->lock_.coScopedLock(); + do_set_rate(permitsPerSecond, current_time_mills()); + } + + protected: + virtual void do_set_rate( + double permitsPerSecond, + std::chrono::steady_clock::time_point now_micros) = 0; + virtual std::chrono::steady_clock::time_point reserve_earliest_available( + int permits, std::chrono::steady_clock::time_point now_micros) = 0; + std::chrono::steady_clock::time_point current_time_mills() { + return std::chrono::steady_clock::now(); + } + + private: + std::chrono::milliseconds reserve_and_get_wait_length( + int permits, std::chrono::steady_clock::time_point now_micros) { + std::chrono::steady_clock::time_point moment_available = + reserve_earliest_available(permits, now_micros); + std::chrono::milliseconds diff_mills = + std::chrono::duration_cast(moment_available - + now_micros); + return std::max(diff_mills, std::chrono::milliseconds(0)); + } + + async_simple::coro::SpinLock lock_; +}; + +class abstract_smooth_rate_limiter : public rate_limiter { + protected: + virtual void do_set_rate(double permits_per_second, + double stable_internal_micros) = 0; + virtual std::chrono::milliseconds stored_permits_to_wait_time( + double stored_permits, double permits_to_take) = 0; + virtual double cool_down_internal_micros() = 0; + void resync(std::chrono::steady_clock::time_point now_micros) { + // if next_free_ticket is in the past, resync to now + ELOG_DEBUG << "now micros: " + << std::chrono::duration_cast( + now_micros.time_since_epoch()) + .count() + << ", next_free_ticket_micros_: " + << std::chrono::duration_cast( + this->next_free_ticket_micros_.time_since_epoch()) + .count(); + if (now_micros > this->next_free_ticket_micros_) { + std::chrono::milliseconds diff_mills = + std::chrono::duration_cast( + now_micros - this->next_free_ticket_micros_); + double newPermits = diff_mills.count() / cool_down_internal_micros(); + this->stored_permits_ = + std::min(this->max_permits_, this->stored_permits_ + newPermits); + this->next_free_ticket_micros_ = now_micros; + } + } + void do_set_rate(double permits_per_second, + std::chrono::steady_clock::time_point now_micros) override { + resync(now_micros); + double stable_internal_micros = 1000 / permits_per_second; + this->stable_internal_micros_ = stable_internal_micros; + do_set_rate(permits_per_second, stable_internal_micros); + } + std::chrono::steady_clock::time_point reserve_earliest_available( + int required_permits, std::chrono::steady_clock::time_point now_micros) { + resync(now_micros); + std::chrono::steady_clock::time_point return_value = + this->next_free_ticket_micros_; + double stored_permits_to_spend = + std::min((double)required_permits, this->stored_permits_); + double fresh_permits = required_permits - stored_permits_to_spend; + std::chrono::milliseconds wait_micros = + stored_permits_to_wait_time(this->stored_permits_, + stored_permits_to_spend) + + std::chrono::milliseconds( + (int64_t)(fresh_permits * this->stable_internal_micros_)); + this->next_free_ticket_micros_ += wait_micros; + this->stored_permits_ -= stored_permits_to_spend; + return return_value; + } + + /** + * The currently stored permits. + */ + double stored_permits_ = 0; + /** + * The maximum number of stored permits. + */ + double max_permits_ = 0; + /** + * The interval between two unit requests, at our stable rate. E.g., a stable + * rate of 5 permits per second has a stable internal of 200ms. + */ + double stable_internal_micros_ = 0; + /** + * The time when the next request (no matter its size) will be granted. After + * granting a request, this is pushed further in the future. Large requests + * push this further than small requests. + */ + std::chrono::steady_clock::time_point next_free_ticket_micros_; +}; + +class smooth_bursty_rate_limiter : public abstract_smooth_rate_limiter { + public: + smooth_bursty_rate_limiter(double permits_per_second) { + this->max_burst_seconds_ = 1.0; + async_simple::coro::syncAwait(set_rate(permits_per_second)); + } + + protected: + void do_set_rate(double permits_per_second, double stable_internal_micros) { + double old_max_permits = this->max_permits_; + this->max_permits_ = this->max_burst_seconds_ * permits_per_second; + this->stored_permits_ = + (0 == old_max_permits) + ? 0 + : this->stored_permits_ * this->max_permits_ / old_max_permits; + ELOG_DEBUG << "max_permits_: " << this->max_permits_ + << ", stored_permits_:" << this->stored_permits_; + } + + std::chrono::milliseconds stored_permits_to_wait_time( + double stored_permits, double permits_to_take) { + return std::chrono::milliseconds(0); + } + + double cool_down_internal_micros() { return this->stable_internal_micros_; } + + private: + /** + * The work(permits) of how many seconds can be saved up if the rate_limiter + * is unused. + */ + double max_burst_seconds_ = 0; +}; +} // namespace coro_io \ No newline at end of file diff --git a/src/coro_io/tests/CMakeLists.txt b/src/coro_io/tests/CMakeLists.txt index 3fef160cc..4d3723c67 100644 --- a/src/coro_io/tests/CMakeLists.txt +++ b/src/coro_io/tests/CMakeLists.txt @@ -3,6 +3,7 @@ add_executable(coro_io_test test_corofile.cpp test_channel.cpp test_client_pool.cpp + test_rate_limiter.cpp main.cpp ) if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_SYSTEM_NAME MATCHES "Windows") # mingw-w64 diff --git a/src/coro_io/tests/test_rate_limiter.cpp b/src/coro_io/tests/test_rate_limiter.cpp new file mode 100644 index 000000000..fa19838f2 --- /dev/null +++ b/src/coro_io/tests/test_rate_limiter.cpp @@ -0,0 +1,92 @@ +#include +#include + +#include +#include + +int64_t current_time_mills() { + std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); + std::chrono::milliseconds ms = + std::chrono::duration_cast( + now.time_since_epoch()); + return ms.count(); +} + +TEST_CASE("test smooth_bursty_rate_limiter simple") { + coro_io::smooth_bursty_rate_limiter rate_limiter(1); + std::chrono::milliseconds wait_time = + async_simple::coro::syncAwait(rate_limiter.acquire(1)); + CHECK_EQ(0, wait_time.count()); +} + +TEST_CASE("test smooth_bursty_rate_limiter acquire multi permits") { + double permits_per_second = 10.0; + int permits_to_acquire = 5; + double expected_cost = (permits_to_acquire) * (1 / permits_per_second); + double cost_diff = 0.1; + + coro_io::smooth_bursty_rate_limiter rate_limiter(permits_per_second); + + // acquire much enough, next acquire will wait + async_simple::coro::syncAwait(rate_limiter.acquire(permits_to_acquire)); + + int64_t start_mills = current_time_mills(); + async_simple::coro::syncAwait(rate_limiter.acquire(1)); + double cost = (current_time_mills() - start_mills) / 1000.0; + CHECK(cost > expected_cost - cost_diff); + CHECK(cost < expected_cost + cost_diff); +} + +TEST_CASE("test smooth_bursty_rate_limiter single thread") { + double permits_per_second = 10.0; + int permits_to_acquire = 5; + double expected_cost = (permits_to_acquire - 1) * (1 / permits_per_second); + double cost_diff = 0.1; + + int64_t start_mills = current_time_mills(); + coro_io::smooth_bursty_rate_limiter rate_limiter(permits_per_second); + for (int i = 0; i < permits_to_acquire; i++) { + std::chrono::milliseconds wait_mills = + async_simple::coro::syncAwait(rate_limiter.acquire(1)); + ELOG_INFO << "wait for " << wait_mills.count(); + } + double cost = (current_time_mills() - start_mills) / 1000.0; + + CHECK(cost > expected_cost - cost_diff); + CHECK(cost < expected_cost + cost_diff); +} + +TEST_CASE("test smooth_bursty_rate_limiter multi coroutine") { + double permits_per_second = 100.0; + int num_of_coroutine = 5; + int permits_to_acquire_every_coroutine = 5; + double expected_cost = + (num_of_coroutine * permits_to_acquire_every_coroutine - 1) * + (1 / permits_per_second); + double cost_diff = 0.1; + + coro_io::smooth_bursty_rate_limiter rate_limiter(permits_per_second); + auto consumer = [&](int coroutine_num) -> async_simple::coro::Lazy { + for (int i = 0; i < permits_to_acquire_every_coroutine; i++) { + co_await rate_limiter.acquire(1); + ELOG_INFO << "coroutine " << coroutine_num << " acquired"; + } + }; + + auto consumer_list_lazy = [&]() -> async_simple::coro::Lazy { + std::vector> lazy_list; + for (int i = 0; i < num_of_coroutine; i++) { + lazy_list.push_back(consumer(i)); + } + co_await collectAllPara(std::move(lazy_list)); + }; + + int64_t start_mills = current_time_mills(); + syncAwait(consumer_list_lazy().via( + coro_io::g_block_io_context_pool(1) + .get_executor())); + double cost = (current_time_mills() - start_mills) / 1000.0; + + CHECK(cost > expected_cost - cost_diff); + CHECK(cost < expected_cost + cost_diff); +} \ No newline at end of file