Skip to content

Add background job to clear unreferenced state groups #18154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
e6a3c46
Add background job to clear unreferenced state groups
devonh Feb 12, 2025
f9670ff
Add changelog entry
devonh Feb 12, 2025
9afe80b
Merge branch 'develop' into devon/unreferenced-bg
devonh Feb 12, 2025
20efdd2
Add test for unreferenced state group cleanup
devonh Feb 12, 2025
9c50123
Remove comments
devonh Feb 12, 2025
28679b6
Fix linter errors
devonh Feb 12, 2025
e606f42
Merge branch 'develop' into devon/unreferenced-bg
devonh Feb 12, 2025
977d83b
Order state_groups
devonh Feb 12, 2025
a487bcb
Update synapse/storage/controllers/purge_events.py
devonh Feb 13, 2025
69d72c2
Update synapse/storage/controllers/purge_events.py
devonh Feb 13, 2025
d8bfac4
Update synapse/storage/controllers/purge_events.py
devonh Feb 13, 2025
0955b7b
Update synapse/storage/controllers/purge_events.py
devonh Feb 13, 2025
ce87ba6
Change mark as pending deletion to do nothing on conflict
devonh Feb 13, 2025
3900791
Fix linter errors
devonh Feb 13, 2025
fe1df20
Move state group deletion job to background updates
devonh Feb 13, 2025
cc9e33b
Fix linter error
devonh Feb 13, 2025
801ca86
Pull over all the db calls since that's what it wants...
devonh Feb 13, 2025
ccb2158
Try OVERRIDING SYSTEM VALUE
devonh Feb 14, 2025
ca7ed76
Move OVERRIDING SYSTEM VALUE
devonh Feb 14, 2025
f45dcb1
Update synapse/storage/databases/state/bg_updates.py
devonh Feb 18, 2025
5e05af2
Update synapse/storage/databases/state/bg_updates.py
devonh Feb 18, 2025
7d1ce8d
Review comments & cleanup
devonh Feb 18, 2025
3c50f71
No string interpolation for sql
devonh Feb 18, 2025
7f611e0
Move background task to current schema version
devonh Feb 18, 2025
21dc067
Comment ignoring table port
devonh Feb 18, 2025
09a817f
Deduplicate find_unreferenced_groups
devonh Feb 18, 2025
042af6e
Don't reuse variables
devonh Feb 18, 2025
4cae2e5
Switch to not use single transaction
devonh Feb 18, 2025
ae367b2
Try casting
devonh Feb 18, 2025
977a8d8
Readd duplication
devonh Feb 19, 2025
d8f920b
Put it back in place
devonh Feb 19, 2025
6582fed
Put it back in place
devonh Feb 19, 2025
ecb8ed5
Fix linter error
devonh Feb 19, 2025
8eae7dd
Use multiple db pools
devonh Feb 19, 2025
8ef4a23
Remove duplication again
devonh Feb 19, 2025
dfa55a9
Lift logic to purge events controller
devonh Feb 19, 2025
02c2c87
Add IGNORED_BACKGROUND_UPDATES to port_db
devonh Feb 19, 2025
6851eaa
Fix error
devonh Feb 19, 2025
d1ca8c7
Update comment on ignoring state_groups_pending_deletion
devonh Feb 19, 2025
0f7c874
Try different sql
devonh Feb 19, 2025
35f15e7
Fixes
devonh Feb 19, 2025
89ec2a3
Update synapse/_scripts/synapse_port_db.py
devonh Feb 24, 2025
f5e59f2
Fix port_db syntax
devonh Feb 24, 2025
92d459d
Remove unnecessary code
devonh Feb 24, 2025
5f5f090
Only clear state groups up to max from first iteration
devonh Feb 24, 2025
8a479ee
Update synapse/storage/controllers/purge_events.py
devonh Feb 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18154.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add background job to clear unreferenced state groups.
2 changes: 1 addition & 1 deletion docs/development/database_schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ by a unique name, the current status (stored in JSON), and some dependency infor
* Whether the update requires a previous update to be complete.
* A rough ordering for which to complete updates.

A new background updates needs to be added to the `background_updates` table:
A new background update needs to be added to the `background_updates` table:

```sql
INSERT INTO background_updates (ordering, update_name, depends_on, progress_json) VALUES
Expand Down
30 changes: 30 additions & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@


IGNORED_TABLES = {
# Porting the auto generated sequence in this table is non-trivial.
# None of the entries in this list are mandatory for Synapse to keep working.
# If state group disk space is an issue after the port, the
# `delete_unreferenced_state_groups_bg_update` background task can be run again.
"state_groups_pending_deletion",
# We don't port these tables, as they're a faff and we can regenerate
# them anyway.
"user_directory",
Expand All @@ -216,6 +221,15 @@
}


# These background updates will not be applied upon creation of the postgres database.
IGNORED_BACKGROUND_UPDATES = {
# Reapplying this background update to the postgres database is unnecessary after
# already having waited for the SQLite database to complete all running background
# updates.
"delete_unreferenced_state_groups_bg_update",
}


# Error returned by the run function. Used at the top-level part of the script to
# handle errors and return codes.
end_error: Optional[str] = None
Expand Down Expand Up @@ -687,6 +701,20 @@ def _is_sqlite_autovacuum_enabled(txn: LoggingTransaction) -> bool:
# 0 means off. 1 means full. 2 means incremental.
return autovacuum_setting != 0

async def remove_ignored_background_updates_from_database(self) -> None:
def _remove_delete_unreferenced_state_groups_bg_updates(
txn: LoggingTransaction,
) -> None:
txn.execute(
"DELETE FROM background_updates WHERE update_name = ANY(?)",
(list(IGNORED_BACKGROUND_UPDATES),),
)

await self.postgres_store.db_pool.runInteraction(
"remove_delete_unreferenced_state_groups_bg_updates",
_remove_delete_unreferenced_state_groups_bg_updates,
)

async def run(self) -> None:
"""Ports the SQLite database to a PostgreSQL database.

Expand Down Expand Up @@ -732,6 +760,8 @@ async def run(self) -> None:
self.hs_config.database.get_single_database()
)

await self.remove_ignored_background_updates_from_database()

await self.run_background_updates_on_postgres()

self.progress.set_state("Creating port tables")
Expand Down
246 changes: 183 additions & 63 deletions synapse/storage/controllers/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@

import itertools
import logging
from typing import TYPE_CHECKING, Collection, Mapping, Set
from typing import (
TYPE_CHECKING,
Collection,
Mapping,
Set,
)

from synapse.logging.context import nested_logging_context
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases import Databases
from synapse.types.storage import _BackgroundUpdates

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand All @@ -44,6 +51,11 @@ def __init__(self, hs: "HomeServer", stores: Databases):
self._delete_state_groups_loop, 60 * 1000
)

self.stores.state.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE,
self._background_delete_unrefereneced_state_groups,
)

async def purge_room(self, room_id: str) -> None:
"""Deletes all record of a room"""

Expand Down Expand Up @@ -80,68 +92,6 @@ async def purge_history(
sg_to_delete
)

async def _find_unreferenced_groups(
self, state_groups: Collection[int]
) -> Set[int]:
"""Used when purging history to figure out which state groups can be
deleted.

Args:
state_groups: Set of state groups referenced by events
that are going to be deleted.

Returns:
The set of state groups that can be deleted.
"""
# Set of events that we have found to be referenced by events
referenced_groups = set()

# Set of state groups we've already seen
state_groups_seen = set(state_groups)

# Set of state groups to handle next.
next_to_search = set(state_groups)
while next_to_search:
# We bound size of groups we're looking up at once, to stop the
# SQL query getting too big
if len(next_to_search) < 100:
current_search = next_to_search
next_to_search = set()
else:
current_search = set(itertools.islice(next_to_search, 100))
next_to_search -= current_search

referenced = await self.stores.main.get_referenced_state_groups(
current_search
)
referenced_groups |= referenced

# We don't continue iterating up the state group graphs for state
# groups that are referenced.
current_search -= referenced

edges = await self.stores.state.get_previous_state_groups(current_search)

prevs = set(edges.values())
# We don't bother re-handling groups we've already seen
prevs -= state_groups_seen
next_to_search |= prevs
state_groups_seen |= prevs

# We also check to see if anything referencing the state groups are
# also unreferenced. This helps ensure that we delete unreferenced
# state groups, if we don't then we will de-delta them when we
# delete the other state groups leading to increased DB usage.
next_edges = await self.stores.state.get_next_state_groups(current_search)
nexts = set(next_edges.keys())
nexts -= state_groups_seen
next_to_search |= nexts
state_groups_seen |= nexts

to_delete = state_groups_seen - referenced_groups

return to_delete

@wrap_as_background_process("_delete_state_groups_loop")
async def _delete_state_groups_loop(self) -> None:
"""Background task that deletes any state groups that may be pending
Expand Down Expand Up @@ -203,3 +153,173 @@ async def _delete_state_groups(
room_id,
groups_to_sequences,
)

async def _background_delete_unrefereneced_state_groups(
self, progress: dict, batch_size: int
) -> int:
"""This background update will slowly delete any unreferenced state groups"""

last_checked_state_group = progress.get("last_checked_state_group")
max_state_group = progress.get("max_state_group")

if last_checked_state_group is None or max_state_group is None:
# This is the first run.
last_checked_state_group = 0

max_state_group = await self.stores.state.db_pool.simple_select_one_onecol(
table="state_groups",
keyvalues={},
retcol="MAX(id)",
allow_none=True,
desc="get_max_state_group",
)
if max_state_group is None:
# There are no state groups so the background process is finished.
await self.stores.state.db_pool.updates._end_background_update(
_BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE
)
return batch_size

(
last_checked_state_group,
final_batch,
) = await self._delete_unreferenced_state_groups_batch(
last_checked_state_group, batch_size, max_state_group
)

if not final_batch:
# There are more state groups to check.
progress = {
"last_checked_state_group": last_checked_state_group,
"max_state_group": max_state_group,
}
await self.stores.state.db_pool.updates._background_update_progress(
_BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE,
progress,
)
else:
# This background process is finished.
await self.stores.state.db_pool.updates._end_background_update(
_BackgroundUpdates.DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE
)

return batch_size

async def _delete_unreferenced_state_groups_batch(
self,
last_checked_state_group: int,
batch_size: int,
max_state_group: int,
) -> tuple[int, bool]:
"""Looks for unreferenced state groups starting from the last state group
checked, and any state groups which would become unreferenced if a state group
was deleted, and marks them for deletion.

Args:
last_checked_state_group: The last state group that was checked.
batch_size: How many state groups to process in this iteration.

Returns:
(last_checked_state_group, final_batch)
"""

# Look for state groups that can be cleaned up.
def get_next_state_groups_txn(txn: LoggingTransaction) -> Set[int]:
state_group_sql = "SELECT id FROM state_groups WHERE ? < id AND id <= ? ORDER BY id LIMIT ?"
txn.execute(
state_group_sql, (last_checked_state_group, max_state_group, batch_size)
)

next_set = {row[0] for row in txn}

return next_set

next_set = await self.stores.state.db_pool.runInteraction(
"get_next_state_groups", get_next_state_groups_txn
)

final_batch = False
if len(next_set) < batch_size:
final_batch = True
else:
last_checked_state_group = max(next_set)

if len(next_set) == 0:
return last_checked_state_group, final_batch

# Find all state groups that can be deleted if the original set is deleted.
# This set includes the original set, as well as any state groups that would
# become unreferenced upon deleting the original set.
to_delete = await self._find_unreferenced_groups(next_set)

if len(to_delete) == 0:
return last_checked_state_group, final_batch

await self.stores.state_deletion.mark_state_groups_as_pending_deletion(
to_delete
)

return last_checked_state_group, final_batch

async def _find_unreferenced_groups(
self,
state_groups: Collection[int],
) -> Set[int]:
"""Used when purging history to figure out which state groups can be
deleted.

Args:
state_groups: Set of state groups referenced by events
that are going to be deleted.

Returns:
The set of state groups that can be deleted.
"""
# Set of events that we have found to be referenced by events
referenced_groups = set()

# Set of state groups we've already seen
state_groups_seen = set(state_groups)

# Set of state groups to handle next.
next_to_search = set(state_groups)
while next_to_search:
# We bound size of groups we're looking up at once, to stop the
# SQL query getting too big
if len(next_to_search) < 100:
current_search = next_to_search
next_to_search = set()
else:
current_search = set(itertools.islice(next_to_search, 100))
next_to_search -= current_search

referenced = await self.stores.main.get_referenced_state_groups(
current_search
)
referenced_groups |= referenced

# We don't continue iterating up the state group graphs for state
# groups that are referenced.
current_search -= referenced

edges = await self.stores.state.get_previous_state_groups(current_search)

prevs = set(edges.values())
# We don't bother re-handling groups we've already seen
prevs -= state_groups_seen
next_to_search |= prevs
state_groups_seen |= prevs

# We also check to see if anything referencing the state groups are
# also unreferenced. This helps ensure that we delete unreferenced
# state groups, if we don't then we will de-delta them when we
# delete the other state groups leading to increased DB usage.
next_edges = await self.stores.state.get_next_state_groups(current_search)
nexts = set(next_edges.keys())
nexts -= state_groups_seen
next_to_search |= nexts
state_groups_seen |= nexts

to_delete = state_groups_seen - referenced_groups

return to_delete
10 changes: 9 additions & 1 deletion synapse/storage/databases/state/bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,15 @@
#

import logging
from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union
from typing import (
TYPE_CHECKING,
Dict,
List,
Mapping,
Optional,
Tuple,
Union,
)

from synapse.logging.opentracing import tag_args, trace
from synapse.storage._base import SQLBaseStore
Expand Down
Loading
Loading