Skip to content

Commit

Permalink
RavenDB-22835 : address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
aviv committed Oct 10, 2024
1 parent f433991 commit 4b0dc52
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 42 deletions.
54 changes: 40 additions & 14 deletions src/Raven.Server/Documents/CountersStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,12 @@ public string PutOrIncrementCounter(DocumentsOperationContext context, string do
var lowerName = Encodings.Utf8.GetString(counterName.Content.Ptr, counterName.Content.Length);

Slice countersGroupKey;
if (table.SeekOneBackwardByPrimaryKeyPrefix(documentKeyPrefix, counterKeySlice, out var existing))
if (table.SeekOneBackwardByPrimaryKeyPrefix(documentKeyPrefix, counterKeySlice, out var existing) == false)
{
data = WriteNewCountersDocument(context, originalName: name, lowerName, value);
countersGroupKey = documentKeyPrefix;
}
else
{
countersGroupKeyScope = Slice.From(context.Allocator, existing.Read((int)CountersTable.CounterKey, out var size), size, out countersGroupKey);

Expand All @@ -412,6 +417,8 @@ public string PutOrIncrementCounter(DocumentsOperationContext context, string do
if (data.TryGet(CounterNames, out BlittableJsonReaderObject originalNames) == false)
ThrowMissingProperty(counterKeySlice, CounterNames);

CheckForCorruptedCountersData(documentId, counters, originalNames);

var counterEtag = _documentsStorage.GenerateNextEtag();

counters.TryGetMember(lowerName, out object existingCounter);
Expand All @@ -434,7 +441,8 @@ public string PutOrIncrementCounter(DocumentsOperationContext context, string do
var existingChangeVector = TableValueToChangeVector(context, (int)CountersTable.ChangeVector, ref existing);
using (data)
{
SplitCounterGroup(context, collectionName, table, documentKeyPrefix, countersGroupKey, counters, dbIds, originalNames, existingChangeVector);
SplitCounterGroup(context, collectionName, table, documentKeyPrefix, countersGroupKey, counters, dbIds, originalNames,
existingChangeVector);
}

// now we retry and know that we have enough space
Expand All @@ -446,10 +454,7 @@ public string PutOrIncrementCounter(DocumentsOperationContext context, string do
if (existingCounter == null)
{
// new counter
originalNames.Modifications = new DynamicJsonValue
{
[lowerName] = name
};
originalNames.Modifications = new DynamicJsonValue { [lowerName] = name };
}
}

Expand All @@ -461,11 +466,6 @@ public string PutOrIncrementCounter(DocumentsOperationContext context, string do
}
}
}
else
{
data = WriteNewCountersDocument(context, originalName: name, lowerName, value);
countersGroupKey = documentKeyPrefix;
}

var groupEtag = _documentsStorage.GenerateNextEtag();
var changeVector = _documentsStorage.GetNewChangeVector(context, groupEtag);
Expand Down Expand Up @@ -520,6 +520,23 @@ public string PutOrIncrementCounter(DocumentsOperationContext context, string do
}
}

private static void CheckForCorruptedCountersData(string documentId, BlittableJsonReaderObject counterValues, BlittableJsonReaderObject counterNames)
{
// RavenDB-22835
// counters data might be corrupted and needs to be fixed before adding/incrementing new counters

var msg = $"Counters data of document '{documentId}' is corrupted. " +
$"Cannot add/increment counters while counter data is in this corrupted state. Please use FixCounters tool (/databases/*/counters/fix-document) in order to add/increment counters of this document";

if (counterValues.Count != counterNames.Count)
throw new InvalidDataException(msg);

var counterValuesPropertyNames = counterValues.GetSortedPropertyNames();
var counterNamesPropertyNames = counterNames.GetSortedPropertyNames();
if (counterValuesPropertyNames.SequenceEqual(counterNamesPropertyNames) == false)
throw new InvalidDataException(msg);
}

internal static void ThrowMissingProperty(Slice counterKeySlice, string property)
{
throw new InvalidDataException($"Counter-Group document '{counterKeySlice}' is missing '{property}' property. Shouldn't happen");
Expand Down Expand Up @@ -1162,6 +1179,17 @@ public bool PutCounters(DocumentsOperationContext context, string documentId, st
}
}

public int FixCountersForDocuments(DocumentsOperationContext context, List<string> docIds)
{
var numOfCounterGroupsFixed = 0;
foreach (var docId in docIds)
{
numOfCounterGroupsFixed += FixCountersForDocument(context, docId);
}

return numOfCounterGroupsFixed;
}

public int FixCountersForDocument(DocumentsOperationContext context, string documentId)
{
List<string> allNames = null;
Expand All @@ -1188,8 +1216,6 @@ public int FixCountersForDocument(DocumentsOperationContext context, string docu

data.TryGet(Values, out BlittableJsonReaderObject counterValues);
data.TryGet(CounterNames, out BlittableJsonReaderObject counterNames);
if (counterValues == null || counterNames == null)
throw new InvalidDataException("Invalid CounterGroup data - missing counter values or counter names");

if (counterValues.Count == counterNames.Count)
{
Expand All @@ -1202,7 +1228,7 @@ public int FixCountersForDocument(DocumentsOperationContext context, string docu
if (collection == null)
{
collection = TableValueToId(context, (int)CountersTable.Collection, ref tvr);
collectionName = _documentsStorage.ExtractCollectionName(context, collection); //todo
collectionName = _documentsStorage.ExtractCollectionName(context, collection);

writeTable = GetOrCreateTable(context.Transaction.InnerTransaction, CountersSchema, collectionName, CollectionTableType.CounterGroups);
}
Expand Down
96 changes: 77 additions & 19 deletions src/Raven.Server/Documents/Handlers/CountersHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
using Sparrow.Json.Parsing;
using Sparrow.Server;
using Voron;
using System.Linq;

namespace Raven.Server.Documents.Handlers
{
Expand Down Expand Up @@ -762,18 +763,23 @@ private static void ThrowMissingDocument(string docId)

public class ExecuteFixCounterGroupsCommand : TransactionOperationsMerger.MergedTransactionCommand
{
private readonly string _docId;
private readonly List<string> _docIds;
private readonly DocumentDatabase _database;

public ExecuteFixCounterGroupsCommand(DocumentDatabase database, string docId)

public ExecuteFixCounterGroupsCommand(DocumentDatabase database, string docId) : this(database, [docId])
{
}

public ExecuteFixCounterGroupsCommand(DocumentDatabase database, List<string> docIdsToFix)
{
_docId = docId;
_docIds = docIdsToFix;
_database = database;
}

protected override long ExecuteCmd(DocumentsOperationContext context)
{
var numOfCounterGroupFixed = _database.DocumentsStorage.CountersStorage.FixCountersForDocument(context, _docId);
var numOfCounterGroupFixed = _database.DocumentsStorage.CountersStorage.FixCountersForDocuments(context, _docIds);
return numOfCounterGroupFixed;
}

Expand All @@ -783,16 +789,27 @@ protected override long ExecuteCmd(DocumentsOperationContext context)
}
}

[RavenAction("/databases/*/counters/fix-document", "PATCH", AuthorizationStatus.ValidUser, EndpointType.Write)]
public async Task FixCounterGroupsForDocument()
{
var docId = GetStringQueryString("id");
await FixCounterGroupsInternal(docId);
}

[RavenAction("/databases/*/counters/fix", "PATCH", AuthorizationStatus.ValidUser, EndpointType.Write)]
public async Task FixCounterGroups()
public async Task FixCounterGroupsForDatabase() => await FixCounterGroupsInternal();

private async Task FixCounterGroupsInternal(string id = null)
{
var first = GetBoolValueQueryString("first", required: false) ?? true;
var task = FixAllCounterGroups();

Task task = id == null
? FixAllCounterGroups()
: Database.TxMerger.Enqueue(new ExecuteFixCounterGroupsCommand(Database, id));

if (first == false)
{
await task;
await NoContent();
return;
}

Expand All @@ -812,12 +829,13 @@ public async Task FixCounterGroups()
toDispose.Add(requestExecutor);

var cmd = new FixCounterGroupsCommand(Database.Name);
tasks.Add(requestExecutor.ExecuteAsync(cmd, context));

toDispose.Add(requestExecutor.ContextPool.AllocateOperationContext(out JsonOperationContext ctx));
tasks.Add(requestExecutor.ExecuteAsync(cmd, ctx));
}
}

await Task.WhenAll(tasks);
await NoContent();
}
finally
{
Expand All @@ -828,24 +846,64 @@ public async Task FixCounterGroups()
}
}


private async Task FixAllCounterGroups()
{
using (Database.DocumentsStorage.ContextPool.AllocateOperationContext(out DocumentsOperationContext context))
using (context.OpenReadTransaction())
const int maxNumberOfDocsToFixInSingleTx = 1024;
int skip = 0;
string currentId = null;
bool corruptedDoc = false;

while (true)
{
var table = new Table(CountersStorage.CountersSchema, context.Transaction.InnerTransaction);
string currentId = null;
List<string> docIdsToFix = new();

foreach (var (_, tvh) in table.SeekByPrimaryKeyPrefix(Slices.BeforeAllKeys, Slices.Empty, skip: 0))
using (Database.DocumentsStorage.ContextPool.AllocateOperationContext(out DocumentsOperationContext context))
using (context.OpenReadTransaction())
{
using (var docId = CountersStorage.ExtractDocId(context, ref tvh.Reader))
var table = new Table(CountersStorage.CountersSchema, context.Transaction.InnerTransaction);

foreach (var (_, tvh) in table.SeekByPrimaryKeyPrefix(Slices.BeforeAllKeys, Slices.Empty, skip: skip))
{
if (docId == currentId)
continue;
skip++;

using (var docId = CountersStorage.ExtractDocId(context, ref tvh.Reader))
{
if (docId != currentId)
{
currentId = docId;
corruptedDoc = false;
}

else if (corruptedDoc)
continue;

using (var data = CountersStorage.GetCounterValuesData(context, ref tvh.Reader))
{
data.TryGet(CountersStorage.Values, out BlittableJsonReaderObject counterValues);
data.TryGet(CountersStorage.CounterNames, out BlittableJsonReaderObject counterNames);

if (counterValues.Count == counterNames.Count)
{
var counterValuesPropertyNames = counterValues.GetSortedPropertyNames();
var counterNamesPropertyNames = counterNames.GetSortedPropertyNames();
if (counterValuesPropertyNames.SequenceEqual(counterNamesPropertyNames))
continue;
}
}

corruptedDoc = true;
docIdsToFix.Add(docId);
}

currentId = docId;
await Database.TxMerger.Enqueue(new ExecuteFixCounterGroupsCommand(Database, docId));
if (docIdsToFix.Count == maxNumberOfDocsToFixInSingleTx)
break;
}

if (docIdsToFix.Count == 0)
return;

await Database.TxMerger.Enqueue(new ExecuteFixCounterGroupsCommand(Database, docIdsToFix));
}
}
}
Expand Down
Loading

0 comments on commit 4b0dc52

Please sign in to comment.