Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(internal): revert rust rate limiter [backport 2.10] #10229

Merged
merged 2 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddtrace/internal/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def _on_jsonify_context_started_flask(ctx):

The names of these events follow the pattern ``context.[started|ended].<context_name>``.
"""

from contextlib import contextmanager
import logging
import sys
Expand All @@ -115,7 +116,6 @@ def _on_jsonify_context_started_flask(ctx):

from ..utils.deprecations import DDTraceDeprecationWarning
from . import event_hub # noqa:F401
from ._core import RateLimiter # noqa:F401
from .event_hub import EventResultDict # noqa:F401
from .event_hub import dispatch
from .event_hub import dispatch_with_results # noqa:F401
Expand Down
49 changes: 0 additions & 49 deletions ddtrace/internal/core/_core.pyi
Original file line number Diff line number Diff line change
@@ -1,49 +0,0 @@
import typing

class RateLimiter:
"""
A token bucket rate limiter implementation
"""

rate_limit: int
time_window: float
effective_rate: float
current_window_rate: float
prev_window_rate: typing.Optional[float]
tokens: float
max_tokens: float
tokens_allowed: int
tokens_total: int
last_update_ns: float
current_window_ns: float

def __init__(self, rate_limit: int, time_window: float = 1e9):
"""
Constructor for RateLimiter

:param rate_limit: The rate limit to apply for number of requests per second.
rate limit > 0 max number of requests to allow per second,
rate limit == 0 to disallow all requests,
rate limit < 0 to allow all requests
:type rate_limit: :obj:`int`
:param time_window: The time window where the rate limit applies in nanoseconds. default value is 1 second.
:type time_window: :obj:`float`
"""
def is_allowed(self, timestamp_ns: typing.Optional[int] = None) -> bool:
"""
Check whether the current request is allowed or not

This method will also reduce the number of available tokens by 1

:param int timestamp_ns: timestamp in nanoseconds for the current request. [deprecated]
:returns: Whether the current request is allowed or not
:rtype: :obj:`bool`
"""
def _is_allowed(self, timestamp_ns: int) -> bool:
"""
Internal method to check whether the current request is allowed or not

:param int timestamp_ns: timestamp in nanoseconds for the current request.
:returns: Whether the current request is allowed or not
:rtype: :obj:`bool`
"""
156 changes: 153 additions & 3 deletions ddtrace/internal/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,174 @@

from ..internal import compat
from ..internal.constants import DEFAULT_SAMPLING_RATE_LIMIT
from .core import RateLimiter as _RateLimiter


class RateLimiter(_RateLimiter):
class RateLimiter(object):
"""
A token bucket rate limiter implementation
"""

__slots__ = (
"_lock",
"current_window_ns",
"time_window",
"last_update_ns",
"max_tokens",
"prev_window_rate",
"rate_limit",
"tokens",
"tokens_allowed",
"tokens_total",
)

def __init__(self, rate_limit: int, time_window: float = 1e9):
"""
Constructor for RateLimiter

:param rate_limit: The rate limit to apply for number of requests per second.
rate limit > 0 max number of requests to allow per second,
rate limit == 0 to disallow all requests,
rate limit < 0 to allow all requests
:type rate_limit: :obj:`int`
:param time_window: The time window where the rate limit applies in nanoseconds. default value is 1 second.
:type time_window: :obj:`float`
"""
self.rate_limit = rate_limit
self.time_window = time_window
self.tokens = rate_limit # type: float
self.max_tokens = rate_limit

self.last_update_ns = compat.monotonic_ns()

self.current_window_ns = 0 # type: float
self.tokens_allowed = 0
self.tokens_total = 0
self.prev_window_rate = None # type: Optional[float]

self._lock = threading.Lock()

@property
def _has_been_configured(self):
return self.rate_limit != DEFAULT_SAMPLING_RATE_LIMIT

def is_allowed(self, timestamp_ns: Optional[int] = None) -> bool:
"""
Check whether the current request is allowed or not

This method will also reduce the number of available tokens by 1

:param int timestamp_ns: timestamp in nanoseconds for the current request.
:returns: Whether the current request is allowed or not
:rtype: :obj:`bool`
"""
if timestamp_ns is not None:
deprecate(
"The `timestamp_ns` parameter is deprecated and will be removed in a future version."
"Ratelimiter will use the current time.",
category=DDTraceDeprecationWarning,
)

# rate limits are tested and mocked in pytest so we need to compute the timestamp here
# (or move the unit tests to rust)
return self._is_allowed(compat.monotonic_ns())
timestamp_ns = timestamp_ns or compat.monotonic_ns()
allowed = self._is_allowed(timestamp_ns)
# Update counts used to determine effective rate
self._update_rate_counts(allowed, timestamp_ns)
return allowed

def _update_rate_counts(self, allowed: bool, timestamp_ns: int) -> None:
# No tokens have been seen yet, start a new window
if not self.current_window_ns:
self.current_window_ns = timestamp_ns

# If more time than the configured time window
# has past since last window, reset
# DEV: We are comparing nanoseconds, so 1e9 is 1 second
elif timestamp_ns - self.current_window_ns >= self.time_window:
# Store previous window's rate to average with current for `.effective_rate`
self.prev_window_rate = self._current_window_rate()
self.tokens_allowed = 0
self.tokens_total = 0
self.current_window_ns = timestamp_ns

# Keep track of total tokens seen vs allowed
if allowed:
self.tokens_allowed += 1
self.tokens_total += 1

def _is_allowed(self, timestamp_ns: int) -> bool:
# Rate limit of 0 blocks everything
if self.rate_limit == 0:
return False

# Negative rate limit disables rate limiting
elif self.rate_limit < 0:
return True

# Lock, we need this to be thread safe, it should be shared by all threads
with self._lock:
self._replenish(timestamp_ns)

if self.tokens >= 1:
self.tokens -= 1
return True

return False

def _replenish(self, timestamp_ns: int) -> None:
try:
# If we are at the max, we do not need to add any more
if self.tokens == self.max_tokens:
return

# Add more available tokens based on how much time has passed
# DEV: We store as nanoseconds, convert to seconds
elapsed = (timestamp_ns - self.last_update_ns) / self.time_window
finally:
# always update the timestamp
# we can't update at the beginning of the function, since if we did, our calculation for
# elapsed would be incorrect
self.last_update_ns = timestamp_ns

# Update the number of available tokens, but ensure we do not exceed the max
self.tokens = min(
self.max_tokens,
self.tokens + (elapsed * self.rate_limit),
)

def _current_window_rate(self) -> float:
# No tokens have been seen, effectively 100% sample rate
# DEV: This is to avoid division by zero error
if not self.tokens_total:
return 1.0

# Get rate of tokens allowed
return self.tokens_allowed / self.tokens_total

@property
def effective_rate(self) -> float:
"""
Return the effective sample rate of this rate limiter

:returns: Effective sample rate value 0.0 <= rate <= 1.0
:rtype: :obj:`float``
"""
# If we have not had a previous window yet, return current rate
if self.prev_window_rate is None:
return self._current_window_rate()

return (self._current_window_rate() + self.prev_window_rate) / 2.0

def __repr__(self):
return "{}(rate_limit={!r}, tokens={!r}, last_update_ns={!r}, effective_rate={!r})".format(
self.__class__.__name__,
self.rate_limit,
self.tokens,
self.last_update_ns,
self.effective_rate,
)

__str__ = __repr__


class RateLimitExceeded(Exception):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
internal: Fix ``Already mutably borrowed`` error by reverting back to pure-python rate limiter.
3 changes: 0 additions & 3 deletions src/core/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
mod rate_limiter;

use pyo3::prelude::*;

#[pymodule]
fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<rate_limiter::RateLimiterPy>()?;
Ok(())
}
Loading
Loading