Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define delayed event ratelimit category #18019

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18019.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Define ratelimit configuration for delayed event management.
3 changes: 3 additions & 0 deletions demo/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ for port in 8080 8081 8082; do
per_user:
per_second: 1000
burst_count: 1000
rc_delayed_event:
per_second: 1000
burst_count: 1000
RC
)
echo "${ratelimiting}" >> "$port.config"
Expand Down
4 changes: 4 additions & 0 deletions docker/complement/conf/workers-shared-extra.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ rc_invites:
per_second: 1000
burst_count: 1000

rc_delayed_event:
per_second: 9999
burst_count: 9999

federation_rr_transactions_per_room_per_second: 9999

allow_device_name_lookup_over_federation: true
Expand Down
19 changes: 19 additions & 0 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,25 @@ rc_federation:
concurrent: 5
```
---
### `rc_delayed_event`


Ratelimiting settings for delayed event management.

This is a ratelimiting option that ratelimits
attempts to restart, cancel, or view delayed events
based on the account the client is using.
It defaults to: `per_second: 10`, `burst_count: 100`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the defaults so different from rc_message which is what is currently used? I expect the default for rc_delayed_event to be similar.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is more lax to permit refreshing delayed events often / in a loop without risk of ratelimiting, whereas "actual" messages are sent less frequently / not periodically and benefit from a stricter ratelimit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewFerr could you put that justification in the config documentation? I think it would help sysadmins understand why they may want to configure this new ratelimit.


Attempts to create or send delayed events are ratelimited not by this setting, but by `rc_message`.

Example configuration:
```yaml
rc_delayed_event:
per_second: 5
burst_count: 50
```
---
### `federation_rr_transactions_per_room_per_second`

Sets outgoing federation transaction frequency for sending read-receipts,
Expand Down
6 changes: 6 additions & 0 deletions synapse/config/ratelimiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,9 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
config.get("remote_media_download_burst_count", "500M")
),
)

self.rc_delayed_event = RatelimitSettings.parse(
config,
"rc_delayed_event",
defaults={"per_second": 10, "burst_count": 100},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 per second seems quite high - may as well not have a rate-limit at all at that point?

The default for rc_message is 0.2/second. How often are you expecting to refresh delayed events? Would it help to have a rate-limit per-device instead of just per-user?

)
11 changes: 8 additions & 3 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, hs: "HomeServer"):
self._config = hs.config
self._clock = hs.get_clock()
self._request_ratelimiter = hs.get_request_ratelimiter()
self._delayed_event_ratelimiter = hs.get_delayed_event_ratelimiter()
self._event_creation_handler = hs.get_event_creation_handler()
self._room_member_handler = hs.get_room_member_handler()

Expand Down Expand Up @@ -227,6 +228,8 @@ async def add(
Raises:
SynapseError: if the delayed event fails validation checks.
"""
# Use standard request limiter for scheduling new delayed events.
# TODO: Instead apply rateliming based on the scheduled send time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# TODO: Instead apply rateliming based on the scheduled send time.
# TODO: Instead apply rateliming based on the scheduled send time.
# See https://github.com/element-hq/synapse/issues/18021

await self._request_ratelimiter.ratelimit(requester)

self._event_creation_handler.validator.validate_builder(
Expand Down Expand Up @@ -285,7 +288,7 @@ async def cancel(self, requester: Requester, delay_id: str) -> None:
NotFoundError: if no matching delayed event could be found.
"""
assert self._is_master
await self._request_ratelimiter.ratelimit(requester)
await self._delayed_event_ratelimiter.ratelimit(requester)
await self._initialized_from_db

next_send_ts = await self._store.cancel_delayed_event(
Expand All @@ -308,7 +311,7 @@ async def restart(self, requester: Requester, delay_id: str) -> None:
NotFoundError: if no matching delayed event could be found.
"""
assert self._is_master
await self._request_ratelimiter.ratelimit(requester)
await self._delayed_event_ratelimiter.ratelimit(requester)
await self._initialized_from_db

next_send_ts = await self._store.restart_delayed_event(
Expand All @@ -332,6 +335,8 @@ async def send(self, requester: Requester, delay_id: str) -> None:
NotFoundError: if no matching delayed event could be found.
"""
assert self._is_master
# Use standard request limiter for sending delayed events on-demand,
# as an on-demand send is similar to sending a regular event.
await self._request_ratelimiter.ratelimit(requester)
await self._initialized_from_db

Expand Down Expand Up @@ -415,7 +420,7 @@ def _schedule_next_at(self, next_send_ts: Timestamp) -> None:

async def get_all_for_user(self, requester: Requester) -> List[JsonDict]:
"""Return all pending delayed events requested by the given user."""
await self._request_ratelimiter.ratelimit(requester)
await self._delayed_event_ratelimiter.ratelimit(requester)
return await self._store.get_all_delayed_events_for_user(
requester.user.localpart
)
Expand Down
8 changes: 8 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,14 @@ def get_request_ratelimiter(self) -> RequestRatelimiter:
self.config.ratelimiting.rc_admin_redaction,
)

@cache_in_self
def get_delayed_event_ratelimiter(self) -> Ratelimiter:
return Ratelimiter(
store=self.get_datastores().main,
clock=self.get_clock(),
cfg=self.config.ratelimiting.rc_delayed_event,
)

@cache_in_self
def get_common_usage_metrics_manager(self) -> CommonUsageMetricsManager:
"""Usage metrics shared between phone home stats and the prometheus exporter."""
Expand Down
138 changes: 138 additions & 0 deletions tests/rest/client/test_delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,25 @@
)
self.assertEqual(setter_expected, content.get(setter_key), content)

@unittest.override_config({"rc_delayed_event": {"per_second": 0.5, "burst_count": 1}})
def test_get_delayed_events_ratelimit(self) -> None:
args = ("GET", PATH_PREFIX)

channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the request isn't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

def test_update_delayed_event_without_id(self) -> None:
channel = self.make_request(
"POST",
Expand Down Expand Up @@ -206,6 +225,44 @@
expect_code=HTTPStatus.NOT_FOUND,
)

@unittest.override_config({"rc_delayed_event": {"per_second": 0.5, "burst_count": 1}})
def test_cancel_delayed_event_ratelimit(self) -> None:
delay_ids = []
for i in range(2):

Check failure on line 231 in tests/rest/client/test_delayed_events.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (B007)

tests/rest/client/test_delayed_events.py:231:13: B007 Loop control variable `i` not used within loop body
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for i in range(2):
for _ in range(2):

channel = self.make_request(
"POST",
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
{},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
delay_id = channel.json_body.get("delay_id")
self.assertIsNotNone(delay_id)
delay_ids.append(delay_id)

channel = self.make_request(
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "cancel"},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

args = (
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "cancel"},
)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the request isn't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

def test_send_delayed_state_event(self) -> None:
state_key = "to_send_on_request"

Expand Down Expand Up @@ -250,6 +307,44 @@
)
self.assertEqual(setter_expected, content.get(setter_key), content)

@unittest.override_config({"rc_message": {"per_second": 3, "burst_count": 4}})
def test_send_delayed_event_ratelimit(self) -> None:
delay_ids = []
for i in range(2):

Check failure on line 313 in tests/rest/client/test_delayed_events.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (B007)

tests/rest/client/test_delayed_events.py:313:13: B007 Loop control variable `i` not used within loop body
channel = self.make_request(
"POST",
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
{},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
delay_id = channel.json_body.get("delay_id")
self.assertIsNotNone(delay_id)
delay_ids.append(delay_id)

channel = self.make_request(
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "send"},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

args = (
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "send"},
)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the request isn't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

def test_restart_delayed_state_event(self) -> None:
state_key = "to_send_on_restarted_timeout"

Expand Down Expand Up @@ -309,6 +404,44 @@
)
self.assertEqual(setter_expected, content.get(setter_key), content)

@unittest.override_config({"rc_delayed_event": {"per_second": 0.5, "burst_count": 1}})
def test_restart_delayed_event_ratelimit(self) -> None:
delay_ids = []
for i in range(2):

Check failure on line 410 in tests/rest/client/test_delayed_events.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (B007)

tests/rest/client/test_delayed_events.py:410:13: B007 Loop control variable `i` not used within loop body
channel = self.make_request(
"POST",
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
{},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
delay_id = channel.json_body.get("delay_id")
self.assertIsNotNone(delay_id)
delay_ids.append(delay_id)

channel = self.make_request(
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "restart"},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

args = (
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "restart"},
)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the request isn't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

def test_delayed_state_events_are_cancelled_by_more_recent_state(self) -> None:
state_key = "to_be_cancelled"

Expand Down Expand Up @@ -374,3 +507,8 @@
room_id: str, event_type: str, state_key: str, delay_ms: int
) -> str:
return f"rooms/{room_id}/state/{event_type}/{state_key}?org.matrix.msc4140.delay={delay_ms}"

def _get_path_for_delayed_send(
room_id: str, event_type: str, delay_ms: int
) -> str:
return f"rooms/{room_id}/send/{event_type}?org.matrix.msc4140.delay={delay_ms}"
34 changes: 34 additions & 0 deletions tests/rest/client/test_rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -2382,6 +2382,40 @@ def test_send_delayed_state_event(self) -> None:
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

@unittest.override_config({
"max_event_delay_duration": "24h",
"rc_message": {"per_second": 1, "burst_count": 2},
})
def test_add_delayed_event_ratelimit(self) -> None:
"""Test that requests to schedule new delayed events are ratelimited by a RateLimiter,
which ratelimits them correctly, including by not limiting when the requester is
exempt from ratelimiting.
"""

# Test that new delayed events are correctly ratelimited.
args = (
"POST",
(
"rooms/%s/send/m.room.message?org.matrix.msc4140.delay=2000"
% self.room_id
).encode("ascii"),
{"body": "test", "msgtype": "m.text"},
)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the new delayed events aren't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)



class RoomSearchTestCase(unittest.HomeserverTestCase):
servlets = [
Expand Down
Loading