From 2e92d7ad6906eca70c4cf2484c603e2c745ce9df Mon Sep 17 00:00:00 2001 From: Brett Langdon Date: Wed, 14 Aug 2024 13:20:47 -0400 Subject: [PATCH 1/2] fix(internal): revert rust rate limiter (#10225) Rust, just revert back to the original pure python rate limiter. The impact here is fairly low, the Rust rate limiter is much faster, but this code isn't the slow part of a hot path, better to be safe than fast. Fixes #10002 - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) - [ ] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) (cherry picked from commit 5e8c0c09fc68ffd353cbcf37c27f499b5f0639c3) --- ddtrace/internal/core/__init__.py | 2 +- ddtrace/internal/core/_core.pyi | 49 ----- ddtrace/internal/rate_limiter.py | 156 ++++++++++++++- ...rt-rust-rate-limiter-e61ca589aa24105b.yaml | 4 + src/core/lib.rs | 3 - src/core/rate_limiter.rs | 179 ------------------ 6 files changed, 158 insertions(+), 235 deletions(-) create mode 100644 releasenotes/notes/fix-revert-rust-rate-limiter-e61ca589aa24105b.yaml delete mode 100644 src/core/rate_limiter.rs diff --git a/ddtrace/internal/core/__init__.py b/ddtrace/internal/core/__init__.py index 0fe4cb1b9ef..249e099c7f2 100644 --- a/ddtrace/internal/core/__init__.py +++ b/ddtrace/internal/core/__init__.py @@ -100,6 +100,7 @@ def _on_jsonify_context_started_flask(ctx): The names of these events follow the pattern ``context.[started|ended].``. """ + from contextlib import contextmanager import logging import sys @@ -115,7 +116,6 @@ def _on_jsonify_context_started_flask(ctx): from ..utils.deprecations import DDTraceDeprecationWarning from . import event_hub # noqa:F401 -from ._core import RateLimiter # noqa:F401 from .event_hub import EventResultDict # noqa:F401 from .event_hub import dispatch from .event_hub import dispatch_with_results # noqa:F401 diff --git a/ddtrace/internal/core/_core.pyi b/ddtrace/internal/core/_core.pyi index 48ec6baf707..e69de29bb2d 100644 --- a/ddtrace/internal/core/_core.pyi +++ b/ddtrace/internal/core/_core.pyi @@ -1,49 +0,0 @@ -import typing - -class RateLimiter: - """ - A token bucket rate limiter implementation - """ - - rate_limit: int - time_window: float - effective_rate: float - current_window_rate: float - prev_window_rate: typing.Optional[float] - tokens: float - max_tokens: float - tokens_allowed: int - tokens_total: int - last_update_ns: float - current_window_ns: float - - def __init__(self, rate_limit: int, time_window: float = 1e9): - """ - Constructor for RateLimiter - - :param rate_limit: The rate limit to apply for number of requests per second. - rate limit > 0 max number of requests to allow per second, - rate limit == 0 to disallow all requests, - rate limit < 0 to allow all requests - :type rate_limit: :obj:`int` - :param time_window: The time window where the rate limit applies in nanoseconds. default value is 1 second. - :type time_window: :obj:`float` - """ - def is_allowed(self, timestamp_ns: typing.Optional[int] = None) -> bool: - """ - Check whether the current request is allowed or not - - This method will also reduce the number of available tokens by 1 - - :param int timestamp_ns: timestamp in nanoseconds for the current request. [deprecated] - :returns: Whether the current request is allowed or not - :rtype: :obj:`bool` - """ - def _is_allowed(self, timestamp_ns: int) -> bool: - """ - Internal method to check whether the current request is allowed or not - - :param int timestamp_ns: timestamp in nanoseconds for the current request. - :returns: Whether the current request is allowed or not - :rtype: :obj:`bool` - """ diff --git a/ddtrace/internal/rate_limiter.py b/ddtrace/internal/rate_limiter.py index cb25e3ed12f..cf04aa8bde8 100644 --- a/ddtrace/internal/rate_limiter.py +++ b/ddtrace/internal/rate_limiter.py @@ -13,24 +13,174 @@ from ..internal import compat from ..internal.constants import DEFAULT_SAMPLING_RATE_LIMIT -from .core import RateLimiter as _RateLimiter -class RateLimiter(_RateLimiter): +class RateLimiter(object): + """ + A token bucket rate limiter implementation + """ + + __slots__ = ( + "_lock", + "current_window_ns", + "time_window", + "last_update_ns", + "max_tokens", + "prev_window_rate", + "rate_limit", + "tokens", + "tokens_allowed", + "tokens_total", + ) + + def __init__(self, rate_limit: int, time_window: float = 1e9): + """ + Constructor for RateLimiter + + :param rate_limit: The rate limit to apply for number of requests per second. + rate limit > 0 max number of requests to allow per second, + rate limit == 0 to disallow all requests, + rate limit < 0 to allow all requests + :type rate_limit: :obj:`int` + :param time_window: The time window where the rate limit applies in nanoseconds. default value is 1 second. + :type time_window: :obj:`float` + """ + self.rate_limit = rate_limit + self.time_window = time_window + self.tokens = rate_limit # type: float + self.max_tokens = rate_limit + + self.last_update_ns = compat.monotonic_ns() + + self.current_window_ns = 0 # type: float + self.tokens_allowed = 0 + self.tokens_total = 0 + self.prev_window_rate = None # type: Optional[float] + + self._lock = threading.Lock() + @property def _has_been_configured(self): return self.rate_limit != DEFAULT_SAMPLING_RATE_LIMIT def is_allowed(self, timestamp_ns: Optional[int] = None) -> bool: + """ + Check whether the current request is allowed or not + + This method will also reduce the number of available tokens by 1 + + :param int timestamp_ns: timestamp in nanoseconds for the current request. + :returns: Whether the current request is allowed or not + :rtype: :obj:`bool` + """ if timestamp_ns is not None: deprecate( "The `timestamp_ns` parameter is deprecated and will be removed in a future version." "Ratelimiter will use the current time.", category=DDTraceDeprecationWarning, ) + # rate limits are tested and mocked in pytest so we need to compute the timestamp here # (or move the unit tests to rust) - return self._is_allowed(compat.monotonic_ns()) + timestamp_ns = timestamp_ns or compat.monotonic_ns() + allowed = self._is_allowed(timestamp_ns) + # Update counts used to determine effective rate + self._update_rate_counts(allowed, timestamp_ns) + return allowed + + def _update_rate_counts(self, allowed: bool, timestamp_ns: int) -> None: + # No tokens have been seen yet, start a new window + if not self.current_window_ns: + self.current_window_ns = timestamp_ns + + # If more time than the configured time window + # has past since last window, reset + # DEV: We are comparing nanoseconds, so 1e9 is 1 second + elif timestamp_ns - self.current_window_ns >= self.time_window: + # Store previous window's rate to average with current for `.effective_rate` + self.prev_window_rate = self._current_window_rate() + self.tokens_allowed = 0 + self.tokens_total = 0 + self.current_window_ns = timestamp_ns + + # Keep track of total tokens seen vs allowed + if allowed: + self.tokens_allowed += 1 + self.tokens_total += 1 + + def _is_allowed(self, timestamp_ns: int) -> bool: + # Rate limit of 0 blocks everything + if self.rate_limit == 0: + return False + + # Negative rate limit disables rate limiting + elif self.rate_limit < 0: + return True + + # Lock, we need this to be thread safe, it should be shared by all threads + with self._lock: + self._replenish(timestamp_ns) + + if self.tokens >= 1: + self.tokens -= 1 + return True + + return False + + def _replenish(self, timestamp_ns: int) -> None: + try: + # If we are at the max, we do not need to add any more + if self.tokens == self.max_tokens: + return + + # Add more available tokens based on how much time has passed + # DEV: We store as nanoseconds, convert to seconds + elapsed = (timestamp_ns - self.last_update_ns) / self.time_window + finally: + # always update the timestamp + # we can't update at the beginning of the function, since if we did, our calculation for + # elapsed would be incorrect + self.last_update_ns = timestamp_ns + + # Update the number of available tokens, but ensure we do not exceed the max + self.tokens = min( + self.max_tokens, + self.tokens + (elapsed * self.rate_limit), + ) + + def _current_window_rate(self) -> float: + # No tokens have been seen, effectively 100% sample rate + # DEV: This is to avoid division by zero error + if not self.tokens_total: + return 1.0 + + # Get rate of tokens allowed + return self.tokens_allowed / self.tokens_total + + @property + def effective_rate(self) -> float: + """ + Return the effective sample rate of this rate limiter + + :returns: Effective sample rate value 0.0 <= rate <= 1.0 + :rtype: :obj:`float`` + """ + # If we have not had a previous window yet, return current rate + if self.prev_window_rate is None: + return self._current_window_rate() + + return (self._current_window_rate() + self.prev_window_rate) / 2.0 + + def __repr__(self): + return "{}(rate_limit={!r}, tokens={!r}, last_update_ns={!r}, effective_rate={!r})".format( + self.__class__.__name__, + self.rate_limit, + self.tokens, + self.last_update_ns, + self.effective_rate, + ) + + __str__ = __repr__ class RateLimitExceeded(Exception): diff --git a/releasenotes/notes/fix-revert-rust-rate-limiter-e61ca589aa24105b.yaml b/releasenotes/notes/fix-revert-rust-rate-limiter-e61ca589aa24105b.yaml new file mode 100644 index 00000000000..93585c83bda --- /dev/null +++ b/releasenotes/notes/fix-revert-rust-rate-limiter-e61ca589aa24105b.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + internal: Fix ``Already mutably borrowed`` error by reverting back to pure-python rate limiter. diff --git a/src/core/lib.rs b/src/core/lib.rs index ceee6a484a8..ff3460e2596 100644 --- a/src/core/lib.rs +++ b/src/core/lib.rs @@ -1,9 +1,6 @@ -mod rate_limiter; - use pyo3::prelude::*; #[pymodule] fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { - m.add_class::()?; Ok(()) } diff --git a/src/core/rate_limiter.rs b/src/core/rate_limiter.rs deleted file mode 100644 index e4f83eda7ba..00000000000 --- a/src/core/rate_limiter.rs +++ /dev/null @@ -1,179 +0,0 @@ -use pyo3::prelude::*; -use std::sync::Arc; -use std::sync::Mutex; - -// Token bucket rate limiter -struct RateLimiter { - rate_limit: i32, - time_window: f64, - tokens: f64, - max_tokens: f64, - last_update_ns: f64, - current_window_ns: f64, - tokens_allowed: i32, - tokens_total: i32, - prev_window_rate: Option, -} - -impl RateLimiter { - pub fn new(rate_limit: i32, time_window: f64) -> RateLimiter { - RateLimiter { - rate_limit, - time_window, - tokens: rate_limit as f64, - max_tokens: rate_limit as f64, - last_update_ns: 0.0, - current_window_ns: 0.0, - tokens_allowed: 0, - tokens_total: 0, - prev_window_rate: None, - } - } - - pub fn _is_allowed(&mut self, timestamp_ns: f64) -> bool { - let allowed = (|| -> bool { - // Rate limit of 0 is always disallowed. Negative rate limits are always allowed. - match self.rate_limit { - 0 => return false, - _ if self.rate_limit < 0 => return true, - _ => {} - } - - if self.tokens < self.max_tokens { - let mut elapsed: f64 = (timestamp_ns - self.last_update_ns) / self.time_window; - if elapsed < 0.0 { - // Note - this should never happen, but if it does, we should reset the elapsed time to avoid negative tokens. - elapsed = 0.0 - } - self.tokens += elapsed * self.max_tokens; - if self.tokens > self.max_tokens { - self.tokens = self.max_tokens; - } - } - - self.last_update_ns = timestamp_ns; - - if self.tokens >= 1.0 { - self.tokens -= 1.0; - return true; - } - - false - })(); - - // If we are in a new window, update the window rate - if self.current_window_ns == 0.0 { - self.current_window_ns = timestamp_ns; - } else if timestamp_ns - self.current_window_ns >= self.time_window { - self.prev_window_rate = Some(self.current_window_rate()); - self.current_window_ns = timestamp_ns; - self.tokens_allowed = 0; - self.tokens_total = 0; - } - - // Update the token counts - self.tokens_total += 1; - if allowed { - self.tokens_allowed += 1; - } - - allowed - } - - pub fn effective_rate(&self) -> f64 { - let current_rate: f64 = self.current_window_rate(); - - if self.prev_window_rate.is_none() { - return current_rate; - } - - (current_rate + self.prev_window_rate.unwrap()) / 2.0 - } - - fn current_window_rate(&self) -> f64 { - // If no tokens have been seen then return 1.0 - // DEV: This is to avoid a division by zero error - if self.tokens_total == 0 { - return 1.0; - } - - self.tokens_allowed as f64 / self.tokens_total as f64 - } -} - -#[pyclass(name = "RateLimiter", subclass, module = "ddtrace.internal.core._core")] -pub struct RateLimiterPy { - rate_limiter: Arc>, -} - -#[pymethods] -impl RateLimiterPy { - #[new] - fn new(rate_limit: i32, time_window: Option) -> Self { - RateLimiterPy { - rate_limiter: Arc::new(Mutex::new(RateLimiter::new( - rate_limit, - time_window.unwrap_or(1e9), - ))), - } - } - - pub fn _is_allowed(&mut self, py: Python<'_>, timestamp_ns: f64) -> bool { - py.allow_threads(|| self.rate_limiter.lock().unwrap()._is_allowed(timestamp_ns)) - } - - #[getter] - pub fn effective_rate(&self) -> f64 { - self.rate_limiter.lock().unwrap().effective_rate() - } - - #[getter] - pub fn current_window_rate(&self) -> f64 { - self.rate_limiter.lock().unwrap().current_window_rate() - } - - #[getter] - pub fn rate_limit(&self) -> i32 { - self.rate_limiter.lock().unwrap().rate_limit - } - - #[getter] - pub fn time_window(&self) -> f64 { - self.rate_limiter.lock().unwrap().time_window - } - - #[getter] - pub fn tokens(&self) -> f64 { - self.rate_limiter.lock().unwrap().tokens - } - - #[getter] - pub fn max_tokens(&self) -> f64 { - self.rate_limiter.lock().unwrap().max_tokens - } - - #[getter] - pub fn last_update_ns(&self) -> f64 { - self.rate_limiter.lock().unwrap().last_update_ns - } - - #[getter] - pub fn current_window_ns(&self) -> f64 { - self.rate_limiter.lock().unwrap().current_window_ns - } - - #[getter] - pub fn prev_window_rate(&self) -> Option { - self.rate_limiter.lock().unwrap().prev_window_rate - } - - #[getter] - pub fn tokens_allowed(&self) -> i32 { - self.rate_limiter.lock().unwrap().tokens_allowed - } - - #[getter] - pub fn tokens_total(&self) -> i32 { - self.rate_limiter.lock().unwrap().tokens_total - } -} From 13f8719ee2f28d8345a434f85558e35b94dcdde0 Mon Sep 17 00:00:00 2001 From: brettlangdon Date: Wed, 14 Aug 2024 14:00:23 -0400 Subject: [PATCH 2/2] fix rust linting --- src/core/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/lib.rs b/src/core/lib.rs index ff3460e2596..0016cdfece6 100644 --- a/src/core/lib.rs +++ b/src/core/lib.rs @@ -1,6 +1,6 @@ use pyo3::prelude::*; #[pymodule] -fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { +fn _core(_: &Bound<'_, PyModule>) -> PyResult<()> { Ok(()) }