Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define delayed event ratelimit category #18019

Open
wants to merge 17 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18019.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Define ratelimit configuration for delayed event management.
3 changes: 3 additions & 0 deletions demo/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ for port in 8080 8081 8082; do
per_user:
per_second: 1000
burst_count: 1000
rc_delayed_event_mgmt:
per_second: 1000
burst_count: 1000
RC
)
echo "${ratelimiting}" >> "$port.config"
Expand Down
4 changes: 4 additions & 0 deletions docker/complement/conf/workers-shared-extra.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ rc_presence:
per_second: 9999
burst_count: 9999

rc_delayed_event_mgmt:
per_second: 9999
burst_count: 9999

federation_rr_transactions_per_room_per_second: 9999

allow_device_name_lookup_over_federation: true
Expand Down
23 changes: 23 additions & 0 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -1947,6 +1947,29 @@ rc_presence:
burst_count: 1
```
---
### `rc_delayed_event_mgmt`

Ratelimiting settings for delayed event management.

This is a ratelimiting option that ratelimits
attempts to restart, cancel, or view delayed events
based on the sending client's account and device ID.
It defaults to: `per_second: 1`, `burst_count: 5`.

Attempts to create or send delayed events are ratelimited not by this setting, but by `rc_message`.

Setting this to a high value allows clients to make delayed event management requests often
(such as repeatedly restarting a delayed event with a short timeout,
or restarting several different delayed events all at once)
without the risk of being ratelimited.

Example configuration:
```yaml
rc_delayed_event_mgmt:
per_second: 2
burst_count: 20
```
---
### `federation_rr_transactions_per_room_per_second`

Sets outgoing federation transaction frequency for sending read-receipts,
Expand Down
6 changes: 6 additions & 0 deletions synapse/config/ratelimiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,9 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
"rc_presence.per_user",
defaults={"per_second": 0.1, "burst_count": 1},
)

self.rc_delayed_event_mgmt = RatelimitSettings.parse(
config,
"rc_delayed_event_mgmt",
defaults={"per_second": 1, "burst_count": 5},
)
32 changes: 28 additions & 4 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from synapse.api.constants import EventTypes
from synapse.api.errors import ShadowBanError
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.logging.opentracing import set_tag
from synapse.metrics import event_processing_positions
Expand Down Expand Up @@ -57,10 +58,19 @@ def __init__(self, hs: "HomeServer"):
self._storage_controllers = hs.get_storage_controllers()
self._config = hs.config
self._clock = hs.get_clock()
self._request_ratelimiter = hs.get_request_ratelimiter()
self._event_creation_handler = hs.get_event_creation_handler()
self._room_member_handler = hs.get_room_member_handler()

self._request_ratelimiter = hs.get_request_ratelimiter()

# Ratelimiter for management of existing delayed events,
# keyed by the sending user ID & device ID.
self._delayed_event_mgmt_ratelimiter = Ratelimiter(
store=self._store,
clock=self._clock,
cfg=self._config.ratelimiting.rc_delayed_event_mgmt,
)

self._next_delayed_event_call: Optional[IDelayedCall] = None

# The current position in the current_state_delta stream
Expand Down Expand Up @@ -227,6 +237,9 @@ async def add(
Raises:
SynapseError: if the delayed event fails validation checks.
"""
# Use standard request limiter for scheduling new delayed events.
# TODO: Instead apply ratelimiting based on the scheduled send time.
# See https://github.com/element-hq/synapse/issues/18021
await self._request_ratelimiter.ratelimit(requester)

self._event_creation_handler.validator.validate_builder(
Expand Down Expand Up @@ -285,7 +298,10 @@ async def cancel(self, requester: Requester, delay_id: str) -> None:
NotFoundError: if no matching delayed event could be found.
"""
assert self._is_master
await self._request_ratelimiter.ratelimit(requester)
await self._delayed_event_mgmt_ratelimiter.ratelimit(
requester,
(requester.user.to_string(), requester.device_id),
)
await self._initialized_from_db

next_send_ts = await self._store.cancel_delayed_event(
Expand All @@ -308,7 +324,10 @@ async def restart(self, requester: Requester, delay_id: str) -> None:
NotFoundError: if no matching delayed event could be found.
"""
assert self._is_master
await self._request_ratelimiter.ratelimit(requester)
await self._delayed_event_mgmt_ratelimiter.ratelimit(
requester,
(requester.user.to_string(), requester.device_id),
)
await self._initialized_from_db

next_send_ts = await self._store.restart_delayed_event(
Expand All @@ -332,6 +351,8 @@ async def send(self, requester: Requester, delay_id: str) -> None:
NotFoundError: if no matching delayed event could be found.
"""
assert self._is_master
# Use standard request limiter for sending delayed events on-demand,
# as an on-demand send is similar to sending a regular event.
await self._request_ratelimiter.ratelimit(requester)
await self._initialized_from_db

Expand Down Expand Up @@ -415,7 +436,10 @@ def _schedule_next_at(self, next_send_ts: Timestamp) -> None:

async def get_all_for_user(self, requester: Requester) -> List[JsonDict]:
"""Return all pending delayed events requested by the given user."""
await self._request_ratelimiter.ratelimit(requester)
await self._delayed_event_mgmt_ratelimiter.ratelimit(
requester,
(requester.user.to_string(), requester.device_id),
)
return await self._store.get_all_delayed_events_for_user(
requester.user.localpart
)
Expand Down
143 changes: 143 additions & 0 deletions tests/rest/client/test_delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,27 @@ def test_delayed_state_events_are_sent_on_timeout(self) -> None:
)
self.assertEqual(setter_expected, content.get(setter_key), content)

@unittest.override_config(
{"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}}
)
def test_get_delayed_events_ratelimit(self) -> None:
args = ("GET", PATH_PREFIX)

channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the request isn't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

def test_update_delayed_event_without_id(self) -> None:
channel = self.make_request(
"POST",
Expand Down Expand Up @@ -206,6 +227,46 @@ def test_cancel_delayed_state_event(self) -> None:
expect_code=HTTPStatus.NOT_FOUND,
)

@unittest.override_config(
{"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}}
)
def test_cancel_delayed_event_ratelimit(self) -> None:
delay_ids = []
for _ in range(2):
channel = self.make_request(
"POST",
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
{},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
delay_id = channel.json_body.get("delay_id")
self.assertIsNotNone(delay_id)
delay_ids.append(delay_id)

channel = self.make_request(
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "cancel"},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

args = (
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "cancel"},
)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the request isn't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

def test_send_delayed_state_event(self) -> None:
state_key = "to_send_on_request"

Expand Down Expand Up @@ -250,6 +311,44 @@ def test_send_delayed_state_event(self) -> None:
)
self.assertEqual(setter_expected, content.get(setter_key), content)

@unittest.override_config({"rc_message": {"per_second": 3.5, "burst_count": 4}})
def test_send_delayed_event_ratelimit(self) -> None:
delay_ids = []
for _ in range(2):
channel = self.make_request(
"POST",
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
{},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
delay_id = channel.json_body.get("delay_id")
self.assertIsNotNone(delay_id)
delay_ids.append(delay_id)

channel = self.make_request(
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "send"},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

args = (
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "send"},
)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the request isn't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

def test_restart_delayed_state_event(self) -> None:
state_key = "to_send_on_restarted_timeout"

Expand Down Expand Up @@ -309,6 +408,46 @@ def test_restart_delayed_state_event(self) -> None:
)
self.assertEqual(setter_expected, content.get(setter_key), content)

@unittest.override_config(
{"rc_delayed_event_mgmt": {"per_second": 0.5, "burst_count": 1}}
)
def test_restart_delayed_event_ratelimit(self) -> None:
delay_ids = []
for _ in range(2):
channel = self.make_request(
"POST",
_get_path_for_delayed_send(self.room_id, _EVENT_TYPE, 100000),
{},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
delay_id = channel.json_body.get("delay_id")
self.assertIsNotNone(delay_id)
delay_ids.append(delay_id)

channel = self.make_request(
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "restart"},
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

args = (
"POST",
f"{PATH_PREFIX}/{delay_ids.pop(0)}",
{"action": "restart"},
)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the request isn't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

def test_delayed_state_events_are_cancelled_by_more_recent_state(self) -> None:
state_key = "to_be_cancelled"

Expand Down Expand Up @@ -374,3 +513,7 @@ def _get_path_for_delayed_state(
room_id: str, event_type: str, state_key: str, delay_ms: int
) -> str:
return f"rooms/{room_id}/state/{event_type}/{state_key}?org.matrix.msc4140.delay={delay_ms}"


def _get_path_for_delayed_send(room_id: str, event_type: str, delay_ms: int) -> str:
return f"rooms/{room_id}/send/{event_type}?org.matrix.msc4140.delay={delay_ms}"
35 changes: 35 additions & 0 deletions tests/rest/client/test_rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -2381,6 +2381,41 @@ def test_send_delayed_state_event(self) -> None:
)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)

@unittest.override_config(
{
"max_event_delay_duration": "24h",
"rc_message": {"per_second": 1, "burst_count": 2},
}
)
def test_add_delayed_event_ratelimit(self) -> None:
"""Test that requests to schedule new delayed events are ratelimited by a RateLimiter,
which ratelimits them correctly, including by not limiting when the requester is
exempt from ratelimiting.
"""

# Test that new delayed events are correctly ratelimited.
args = (
"POST",
(
"rooms/%s/send/m.room.message?org.matrix.msc4140.delay=2000"
% self.room_id
).encode("ascii"),
{"body": "test", "msgtype": "m.text"},
)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.TOO_MANY_REQUESTS, channel.code, channel.result)

# Add the current user to the ratelimit overrides, allowing them no ratelimiting.
self.get_success(
self.hs.get_datastores().main.set_ratelimit_for_user(self.user_id, 0, 0)
)

# Test that the new delayed events aren't ratelimited anymore.
channel = self.make_request(*args)
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)


class RoomSearchTestCase(unittest.HomeserverTestCase):
servlets = [
Expand Down
Loading