Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature]add ratelimiter using async_simple coroutine #482

Merged
merged 15 commits into from
Nov 1, 2023
Merged
157 changes: 157 additions & 0 deletions include/ylt/coro_io/rate_limiter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright (c) 2023, Alibaba Group Holding Limited;
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <async_simple/coro/Lazy.h>
#include <async_simple/coro/Mutex.h>
#include <async_simple/coro/Sleep.h>
#include <async_simple/coro/SyncAwait.h>

#include <algorithm>
#include <chrono>
#include <iostream>
#include <ylt/easylog.hpp>

namespace coro_io {
class RateLimiter {
StephenRi marked this conversation as resolved.
Show resolved Hide resolved
public:
async_simple::coro::Lazy<double> acquire(int permits) {
co_await this->mutex_.coLock();
long wait_mills = reserveAndGetWaitLength(permits, currentTimeMills());
StephenRi marked this conversation as resolved.
Show resolved Hide resolved
this->mutex_.unlock();
co_await async_simple::coro::sleep(std::chrono::milliseconds(wait_mills));
StephenRi marked this conversation as resolved.
Show resolved Hide resolved
co_return 1.0 * wait_mills / 1000;
}
async_simple::coro::Lazy<void> setRate(double permitsPerSecond) {
co_await this->mutex_.coLock();
doSetRate(permitsPerSecond, currentTimeMills());
this->mutex_.unlock();
}

protected:
virtual void doSetRate(double permitsPerSecond, long now_micros) = 0;
virtual long reserveEarliestAvailable(int permits, long now_micros) = 0;
long currentTimeMills() {
StephenRi marked this conversation as resolved.
Show resolved Hide resolved
std::chrono::system_clock::time_point now =
std::chrono::system_clock::now();
StephenRi marked this conversation as resolved.
Show resolved Hide resolved
std::chrono::milliseconds ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch());
return ms.count();
}

private:
long reserveAndGetWaitLength(int permits, long now_micros) {
long moment_available = reserveEarliestAvailable(permits, now_micros);
return std::max(moment_available - now_micros, 0L);
}

async_simple::coro::Mutex mutex_;
StephenRi marked this conversation as resolved.
Show resolved Hide resolved
};

class AbstractSmoothRateLimiter : public RateLimiter {
protected:
virtual void doSetRate(double permits_per_second,
double stable_internal_micros) = 0;
virtual long storedPermitsToWaitTime(double stored_permits,
double permits_to_take) = 0;
virtual double coolDownInternalMicros() = 0;
void resync(long now_micros) {
// if next_free_ticket is in the past, resync to now
ELOG_DEBUG << "now micros: " << now_micros << ", next_free_ticket_micros_: "
<< this->next_free_ticket_micros_;
if (now_micros > this->next_free_ticket_micros_) {
double newPermits = (now_micros - this->next_free_ticket_micros_) /
coolDownInternalMicros();
this->stored_permits_ =
std::min(this->max_permits_, this->stored_permits_ + newPermits);
this->next_free_ticket_micros_ = now_micros;
}
}
void doSetRate(double permits_per_second, long now_micros) override {
resync(now_micros);
double stable_internal_micros = 1000 / permits_per_second;
this->stable_internal_micros_ = stable_internal_micros;
doSetRate(permits_per_second, stable_internal_micros);
}
long reserveEarliestAvailable(int required_permits, long now_micros) {
resync(now_micros);
long return_value = this->next_free_ticket_micros_;
double stored_permits_to_spend =
std::min((double)required_permits, this->stored_permits_);
double fresh_permits = required_permits - stored_permits_to_spend;
long wait_micros = storedPermitsToWaitTime(this->stored_permits_,
stored_permits_to_spend) +
(long)(fresh_permits * this->stable_internal_micros_);
this->next_free_ticket_micros_ += wait_micros;
this->stored_permits_ -= stored_permits_to_spend;
return return_value;
}

/**
* The currently stored permits.
*/
double stored_permits_ = 0;
/**
* The maximum number of stored permits.
*/
double max_permits_ = 0;
/**
* The interval between two unit requests, at our stable rate. E.g., a stable
* rate of 5 permits per second has a stable internal of 200ms.
*/
double stable_internal_micros_ = 0;
/**
* The time when the next request (no matter its size) will be granted. After
* granting a request, this is pushed further in the future. Large requests
* push this further than small requests.
*/
long next_free_ticket_micros_ = 0;
StephenRi marked this conversation as resolved.
Show resolved Hide resolved
};

class SmoothBurstyRateLimiter : public AbstractSmoothRateLimiter {
public:
SmoothBurstyRateLimiter(double permits_per_second) {
this->max_burst_seconds_ = 1.0;
async_simple::coro::syncAwait(setRate(permits_per_second));
}

protected:
void doSetRate(double permits_per_second, double stable_internal_micros) {
double old_max_permits = this->max_permits_;
this->max_permits_ = this->max_burst_seconds_ * permits_per_second;
this->stored_permits_ =
(0 == old_max_permits)
? 0
: this->stored_permits_ * this->max_permits_ / old_max_permits;
ELOG_DEBUG << "max_permits_: " << this->max_permits_
<< ", stored_permits_:" << this->stored_permits_;
}

long storedPermitsToWaitTime(double stored_permits, double permits_to_take) {
return 0L;
}

double coolDownInternalMicros() { return this->stable_internal_micros_; }

private:
/**
* The work(permits) of how many seconds can be saved up if the RateLimiter is
* unused.
*/
double max_burst_seconds_ = 0;
};
} // namespace coro_io
1 change: 1 addition & 0 deletions src/coro_io/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_executable(coro_io_test
test_corofile.cpp
test_channel.cpp
test_client_pool.cpp
test_rate_limiter.cpp
main.cpp
)
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_SYSTEM_NAME MATCHES "Windows") # mingw-w64
Expand Down
90 changes: 90 additions & 0 deletions src/coro_io/tests/test_rate_limiter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#include <async_simple/coro/Collect.h>
#include <doctest.h>

#include <ylt/coro_io/io_context_pool.hpp>
#include <ylt/coro_io/rate_limiter.hpp>

long currentTimeMills() {
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
std::chrono::milliseconds ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch());
return ms.count();
}

TEST_CASE("test SmoothBurstyRateLimiter simple") {
coro_io::SmoothBurstyRateLimiter rateLimiter(1);
double wait_time = async_simple::coro::syncAwait(rateLimiter.acquire(1));
CHECK_EQ(0, wait_time);
}

TEST_CASE("test SmoothBurstyRateLimiter acquire multi permits") {
double permits_per_second = 10.0;
int permits_to_acquire = 5;
double expected_cost = (permits_to_acquire) * (1 / permits_per_second);
double cost_diff = 0.1;

coro_io::SmoothBurstyRateLimiter rateLimiter(permits_per_second);

// acquire much enough, next acquire will wait
async_simple::coro::syncAwait(rateLimiter.acquire(permits_to_acquire));

long start_mills = currentTimeMills();
async_simple::coro::syncAwait(rateLimiter.acquire(1));
double cost = (currentTimeMills() - start_mills) / 1000.0;
CHECK(cost > expected_cost - cost_diff);
CHECK(cost < expected_cost + cost_diff);
}

TEST_CASE("test SmoothBurstyRateLimiter single thread") {
double permits_per_second = 10.0;
int permits_to_acquire = 5;
double expected_cost = (permits_to_acquire - 1) * (1 / permits_per_second);
double cost_diff = 0.1;

long start_mills = currentTimeMills();
coro_io::SmoothBurstyRateLimiter rateLimiter(permits_per_second);
for (int i = 0; i < permits_to_acquire; i++) {
double waitMills = async_simple::coro::syncAwait(rateLimiter.acquire(1));
ELOG_INFO << "wait for " << waitMills;
}
double cost = (currentTimeMills() - start_mills) / 1000.0;

CHECK(cost > expected_cost - cost_diff);
CHECK(cost < expected_cost + cost_diff);
}

TEST_CASE("test SmoothBurstyRateLimiter multi coroutine") {
double permits_per_second = 100.0;
int num_of_coroutine = 5;
int permits_to_acquire_every_coroutine = 5;
double expected_cost =
(num_of_coroutine * permits_to_acquire_every_coroutine - 1) *
(1 / permits_per_second);
double cost_diff = 0.1;

coro_io::SmoothBurstyRateLimiter rateLimiter(permits_per_second);
auto consumer = [&](int coroutine_num) -> async_simple::coro::Lazy<void> {
for (int i = 0; i < permits_to_acquire_every_coroutine; i++) {
co_await rateLimiter.acquire(1);
ELOG_INFO << "coroutine " << coroutine_num << " acquired";
}
};

auto consumerListLazy = [&]() -> async_simple::coro::Lazy<void> {
std::vector<async_simple::coro::Lazy<void>> lazyList;
for (int i = 0; i < num_of_coroutine; i++) {
lazyList.push_back(consumer(i));
}
co_await collectAllPara(std::move(lazyList));
};

long start_mills = currentTimeMills();
syncAwait(consumerListLazy().via(
coro_io::g_block_io_context_pool<coro_io::multithread_context_pool>(1)
.get_executor()));
double cost = (currentTimeMills() - start_mills) / 1000.0;

CHECK(cost > expected_cost - cost_diff);
CHECK(cost < expected_cost + cost_diff);
}