Skip to content

Commit

Permalink
RavenDB-22835 : address PR comment - refactor FixAllCounterGroups : i…
Browse files Browse the repository at this point in the history
…nstead of using 'skip', keep track of last-corrupted-doc-id and use it to create a 'StartAfter' slice, in order to start iterating from the next document
  • Loading branch information
aviv committed Nov 11, 2024
1 parent 27414e6 commit 0c38c20
Showing 1 changed file with 94 additions and 37 deletions.
131 changes: 94 additions & 37 deletions src/Raven.Server/Documents/Handlers/CountersHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
using Sparrow.Server;
using Voron;
using System.Linq;
using Sparrow.Server.Utils;

namespace Raven.Server.Documents.Handlers
{
Expand Down Expand Up @@ -846,65 +847,121 @@ private async Task FixCounterGroupsInternal(string id = null)
}
}


private async Task FixAllCounterGroups()
{
const int maxNumberOfDocsToFixInSingleTx = 1024;
int skip = 0;
string currentId = null;
bool corruptedDoc = false;
List<string> docIdsToFix = new();
StartAfterSliceHolder startAfterSliceHolder = null;
string lastDocId = null;

while (true)
try
{
List<string> docIdsToFix = new();

using (Database.DocumentsStorage.ContextPool.AllocateOperationContext(out DocumentsOperationContext context))
using (context.OpenReadTransaction())
while (true)
{
var table = new Table(CountersStorage.CountersSchema, context.Transaction.InnerTransaction);

foreach (var (_, tvh) in table.SeekByPrimaryKeyPrefix(Slices.BeforeAllKeys, Slices.Empty, skip: skip))
using (Database.DocumentsStorage.ContextPool.AllocateOperationContext(out DocumentsOperationContext context))
using (context.OpenReadTransaction())
{
skip++;
var table = new Table(CountersStorage.CountersSchema, context.Transaction.InnerTransaction);

using (var docId = CountersStorage.ExtractDocId(context, ref tvh.Reader))
while (true)
{
if (docId != currentId)
{
currentId = docId;
corruptedDoc = false;
}
bool hasMore = false;

else if (corruptedDoc)
continue;

using (var data = CountersStorage.GetCounterValuesData(context, ref tvh.Reader))
foreach (var (_, tvh) in table.SeekByPrimaryKeyPrefix(Slices.BeforeAllKeys, startAfter: startAfterSliceHolder?.GetStartAfterSlice(context) ?? Slices.Empty, skip: 0))
{
data.TryGet(CountersStorage.Values, out BlittableJsonReaderObject counterValues);
data.TryGet(CountersStorage.CounterNames, out BlittableJsonReaderObject counterNames);
hasMore = true;

if (counterValues.Count == counterNames.Count)
using (var docId = CountersStorage.ExtractDocId(context, ref tvh.Reader))
{
var counterValuesPropertyNames = counterValues.GetSortedPropertyNames();
var counterNamesPropertyNames = counterNames.GetSortedPropertyNames();
if (counterValuesPropertyNames.SequenceEqual(counterNamesPropertyNames))
continue;
if (docId != lastDocId)
{
lastDocId = docId;

using (var old = startAfterSliceHolder)
{
startAfterSliceHolder = new StartAfterSliceHolder(lastDocId);
}
}

using (var data = CountersStorage.GetCounterValuesData(context, ref tvh.Reader))
{
data.TryGet(CountersStorage.Values, out BlittableJsonReaderObject counterValues);
data.TryGet(CountersStorage.CounterNames, out BlittableJsonReaderObject counterNames);

if (counterValues.Count == counterNames.Count)
{
var counterValuesPropertyNames = counterValues.GetSortedPropertyNames();
var counterNamesPropertyNames = counterNames.GetSortedPropertyNames();
if (counterValuesPropertyNames.SequenceEqual(counterNamesPropertyNames))
continue;
}
}

// Document is corrupted; add to list and break to skip remaining CounterGroups of current document
docIdsToFix.Add(lastDocId);

break;
}
}

corruptedDoc = true;
docIdsToFix.Add(docId);
if (docIdsToFix.Count == 0)
return;

if (docIdsToFix.Count < maxNumberOfDocsToFixInSingleTx && hasMore)
continue;

await Database.TxMerger.Enqueue(new ExecuteFixCounterGroupsCommand(Database, docIdsToFix));
docIdsToFix.Clear();

if (hasMore)
break; // break from inner while loop in order to open a new read tx

startAfterSliceHolder?.Dispose();
return;
}

if (docIdsToFix.Count == maxNumberOfDocsToFixInSingleTx)
break;
}
}
}
finally
{
startAfterSliceHolder?.Dispose();
}
}

if (docIdsToFix.Count == 0)
return;

await Database.TxMerger.Enqueue(new ExecuteFixCounterGroupsCommand(Database, docIdsToFix));
private class StartAfterSliceHolder : IDisposable
{
private readonly string _docId;

private readonly List<IDisposable> _toDispose = [];


public StartAfterSliceHolder(string docId)
{
_docId = docId;
}

public void Dispose()
{
foreach (var scope in _toDispose)
{
scope.Dispose();
}

_toDispose.Clear();
}

public unsafe Slice GetStartAfterSlice(DocumentsOperationContext context)
{
_toDispose.Add(DocumentIdWorker.GetSliceFromId(context, _docId, out Slice documentKeyPrefix, separator: SpecialChars.RecordSeparator));
_toDispose.Add(context.Allocator.Allocate(documentKeyPrefix.Size + sizeof(long), out var startAfterBuffer));
_toDispose.Add(Slice.External(context.Allocator, startAfterBuffer.Ptr, startAfterBuffer.Length, out var startAfter));

documentKeyPrefix.CopyTo(startAfterBuffer.Ptr);
*(long*)(startAfterBuffer.Ptr + documentKeyPrefix.Size) = long.MaxValue;

return startAfter;
}
}

Expand Down

0 comments on commit 0c38c20

Please sign in to comment.