21
21
22
22
import itertools
23
23
import logging
24
- from typing import (
25
- TYPE_CHECKING ,
26
- Collection ,
27
- Mapping ,
28
- Set ,
29
- )
24
+ from typing import TYPE_CHECKING , Collection , Mapping , Set
30
25
31
26
from synapse .logging .context import nested_logging_context
32
27
from synapse .metrics .background_process_metrics import wrap_as_background_process
33
- from synapse .storage .database import LoggingTransaction
34
28
from synapse .storage .databases import Databases
35
- from synapse .types .storage import _BackgroundUpdates
36
29
37
30
if TYPE_CHECKING :
38
31
from synapse .server import HomeServer
@@ -51,11 +44,6 @@ def __init__(self, hs: "HomeServer", stores: Databases):
51
44
self ._delete_state_groups_loop , 60 * 1000
52
45
)
53
46
54
- self .stores .state .db_pool .updates .register_background_update_handler (
55
- _BackgroundUpdates .DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE ,
56
- self ._background_delete_unrefereneced_state_groups ,
57
- )
58
-
59
47
async def purge_room (self , room_id : str ) -> None :
60
48
"""Deletes all record of a room"""
61
49
@@ -92,6 +80,68 @@ async def purge_history(
92
80
sg_to_delete
93
81
)
94
82
83
+ async def _find_unreferenced_groups (
84
+ self , state_groups : Collection [int ]
85
+ ) -> Set [int ]:
86
+ """Used when purging history to figure out which state groups can be
87
+ deleted.
88
+
89
+ Args:
90
+ state_groups: Set of state groups referenced by events
91
+ that are going to be deleted.
92
+
93
+ Returns:
94
+ The set of state groups that can be deleted.
95
+ """
96
+ # Set of events that we have found to be referenced by events
97
+ referenced_groups = set ()
98
+
99
+ # Set of state groups we've already seen
100
+ state_groups_seen = set (state_groups )
101
+
102
+ # Set of state groups to handle next.
103
+ next_to_search = set (state_groups )
104
+ while next_to_search :
105
+ # We bound size of groups we're looking up at once, to stop the
106
+ # SQL query getting too big
107
+ if len (next_to_search ) < 100 :
108
+ current_search = next_to_search
109
+ next_to_search = set ()
110
+ else :
111
+ current_search = set (itertools .islice (next_to_search , 100 ))
112
+ next_to_search -= current_search
113
+
114
+ referenced = await self .stores .main .get_referenced_state_groups (
115
+ current_search
116
+ )
117
+ referenced_groups |= referenced
118
+
119
+ # We don't continue iterating up the state group graphs for state
120
+ # groups that are referenced.
121
+ current_search -= referenced
122
+
123
+ edges = await self .stores .state .get_previous_state_groups (current_search )
124
+
125
+ prevs = set (edges .values ())
126
+ # We don't bother re-handling groups we've already seen
127
+ prevs -= state_groups_seen
128
+ next_to_search |= prevs
129
+ state_groups_seen |= prevs
130
+
131
+ # We also check to see if anything referencing the state groups are
132
+ # also unreferenced. This helps ensure that we delete unreferenced
133
+ # state groups, if we don't then we will de-delta them when we
134
+ # delete the other state groups leading to increased DB usage.
135
+ next_edges = await self .stores .state .get_next_state_groups (current_search )
136
+ nexts = set (next_edges .keys ())
137
+ nexts -= state_groups_seen
138
+ next_to_search |= nexts
139
+ state_groups_seen |= nexts
140
+
141
+ to_delete = state_groups_seen - referenced_groups
142
+
143
+ return to_delete
144
+
95
145
@wrap_as_background_process ("_delete_state_groups_loop" )
96
146
async def _delete_state_groups_loop (self ) -> None :
97
147
"""Background task that deletes any state groups that may be pending
@@ -153,173 +203,3 @@ async def _delete_state_groups(
153
203
room_id ,
154
204
groups_to_sequences ,
155
205
)
156
-
157
- async def _background_delete_unrefereneced_state_groups (
158
- self , progress : dict , batch_size : int
159
- ) -> int :
160
- """This background update will slowly delete any unreferenced state groups"""
161
-
162
- last_checked_state_group = progress .get ("last_checked_state_group" )
163
- max_state_group = progress .get ("max_state_group" )
164
-
165
- if last_checked_state_group is None or max_state_group is None :
166
- # This is the first run.
167
- last_checked_state_group = 0
168
-
169
- max_state_group = await self .stores .state .db_pool .simple_select_one_onecol (
170
- table = "state_groups" ,
171
- keyvalues = {},
172
- retcol = "MAX(id)" ,
173
- allow_none = True ,
174
- desc = "get_max_state_group" ,
175
- )
176
- if max_state_group is None :
177
- # There are no state groups so the background process is finished.
178
- await self .stores .state .db_pool .updates ._end_background_update (
179
- _BackgroundUpdates .DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE
180
- )
181
- return batch_size
182
-
183
- (
184
- last_checked_state_group ,
185
- final_batch ,
186
- ) = await self ._delete_unreferenced_state_groups_batch (
187
- last_checked_state_group , batch_size , max_state_group
188
- )
189
-
190
- if not final_batch :
191
- # There are more state groups to check.
192
- progress = {
193
- "last_checked_state_group" : last_checked_state_group ,
194
- "max_state_group" : max_state_group ,
195
- }
196
- await self .stores .state .db_pool .updates ._background_update_progress (
197
- _BackgroundUpdates .DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE ,
198
- progress ,
199
- )
200
- else :
201
- # This background process is finished.
202
- await self .stores .state .db_pool .updates ._end_background_update (
203
- _BackgroundUpdates .DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE
204
- )
205
-
206
- return batch_size
207
-
208
- async def _delete_unreferenced_state_groups_batch (
209
- self ,
210
- last_checked_state_group : int ,
211
- batch_size : int ,
212
- max_state_group : int ,
213
- ) -> tuple [int , bool ]:
214
- """Looks for unreferenced state groups starting from the last state group
215
- checked, and any state groups which would become unreferenced if a state group
216
- was deleted, and marks them for deletion.
217
-
218
- Args:
219
- last_checked_state_group: The last state group that was checked.
220
- batch_size: How many state groups to process in this iteration.
221
-
222
- Returns:
223
- (last_checked_state_group, final_batch)
224
- """
225
-
226
- # Look for state groups that can be cleaned up.
227
- def get_next_state_groups_txn (txn : LoggingTransaction ) -> Set [int ]:
228
- state_group_sql = "SELECT id FROM state_groups WHERE ? < id AND id <= ? ORDER BY id LIMIT ?"
229
- txn .execute (
230
- state_group_sql , (last_checked_state_group , max_state_group , batch_size )
231
- )
232
-
233
- next_set = {row [0 ] for row in txn }
234
-
235
- return next_set
236
-
237
- next_set = await self .stores .state .db_pool .runInteraction (
238
- "get_next_state_groups" , get_next_state_groups_txn
239
- )
240
-
241
- final_batch = False
242
- if len (next_set ) < batch_size :
243
- final_batch = True
244
- else :
245
- last_checked_state_group = max (next_set )
246
-
247
- if len (next_set ) == 0 :
248
- return last_checked_state_group , final_batch
249
-
250
- # Find all state groups that can be deleted if the original set is deleted.
251
- # This set includes the original set, as well as any state groups that would
252
- # become unreferenced upon deleting the original set.
253
- to_delete = await self ._find_unreferenced_groups (next_set )
254
-
255
- if len (to_delete ) == 0 :
256
- return last_checked_state_group , final_batch
257
-
258
- await self .stores .state_deletion .mark_state_groups_as_pending_deletion (
259
- to_delete
260
- )
261
-
262
- return last_checked_state_group , final_batch
263
-
264
- async def _find_unreferenced_groups (
265
- self ,
266
- state_groups : Collection [int ],
267
- ) -> Set [int ]:
268
- """Used when purging history to figure out which state groups can be
269
- deleted.
270
-
271
- Args:
272
- state_groups: Set of state groups referenced by events
273
- that are going to be deleted.
274
-
275
- Returns:
276
- The set of state groups that can be deleted.
277
- """
278
- # Set of events that we have found to be referenced by events
279
- referenced_groups = set ()
280
-
281
- # Set of state groups we've already seen
282
- state_groups_seen = set (state_groups )
283
-
284
- # Set of state groups to handle next.
285
- next_to_search = set (state_groups )
286
- while next_to_search :
287
- # We bound size of groups we're looking up at once, to stop the
288
- # SQL query getting too big
289
- if len (next_to_search ) < 100 :
290
- current_search = next_to_search
291
- next_to_search = set ()
292
- else :
293
- current_search = set (itertools .islice (next_to_search , 100 ))
294
- next_to_search -= current_search
295
-
296
- referenced = await self .stores .main .get_referenced_state_groups (
297
- current_search
298
- )
299
- referenced_groups |= referenced
300
-
301
- # We don't continue iterating up the state group graphs for state
302
- # groups that are referenced.
303
- current_search -= referenced
304
-
305
- edges = await self .stores .state .get_previous_state_groups (current_search )
306
-
307
- prevs = set (edges .values ())
308
- # We don't bother re-handling groups we've already seen
309
- prevs -= state_groups_seen
310
- next_to_search |= prevs
311
- state_groups_seen |= prevs
312
-
313
- # We also check to see if anything referencing the state groups are
314
- # also unreferenced. This helps ensure that we delete unreferenced
315
- # state groups, if we don't then we will de-delta them when we
316
- # delete the other state groups leading to increased DB usage.
317
- next_edges = await self .stores .state .get_next_state_groups (current_search )
318
- nexts = set (next_edges .keys ())
319
- nexts -= state_groups_seen
320
- next_to_search |= nexts
321
- state_groups_seen |= nexts
322
-
323
- to_delete = state_groups_seen - referenced_groups
324
-
325
- return to_delete
0 commit comments