Skip to content

Commit

Permalink
RavenDB-22835 : address PR comments - move cancellation token from Ta…
Browse files Browse the repository at this point in the history
…sk.Run into CountersRepairTask, move FixCountersForDocuments from CountersStorage to CountersRepairTask
  • Loading branch information
aviv committed Dec 3, 2024
1 parent 6b0b452 commit a83768c
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 152 deletions.
167 changes: 153 additions & 14 deletions src/Raven.Server/Documents/CountersRepairTask.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
using Raven.Server.ServerWide.Context;
using Sparrow.Server.Utils;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Raven.Server.ServerWide.Context;
using Sparrow.Binary;
using Sparrow.Json;
using Sparrow.Json.Parsing;
using Sparrow.Logging;
using Sparrow.Server.Utils;
using Voron;
using Voron.Data.Tables;
using static Raven.Server.Documents.CountersStorage;
using static Raven.Server.Documents.DocumentsStorage;

namespace Raven.Server.Documents;

public class CountersRepairTask
{
private readonly DocumentDatabase _database;

private readonly CancellationToken _cancellationToken;
private readonly Logger _logger;

public static string Completed = string.Empty;

public CountersRepairTask(DocumentDatabase database)
public CountersRepairTask(DocumentDatabase database, CancellationToken databaseShutdown)
{
_database = database;
_cancellationToken = databaseShutdown;
_logger = LoggingSource.Instance.GetLogger<CountersRepairTask>(_database.Name);
}

Expand All @@ -41,20 +47,24 @@ public async Task Start(string lastProcessedKey)
{
while (true)
{
_cancellationToken.ThrowIfCancellationRequested();

using (_database.DocumentsStorage.ContextPool.AllocateOperationContext(out DocumentsOperationContext context))
using (context.OpenReadTransaction())
{
var table = new Table(CountersStorage.CountersSchema, context.Transaction.InnerTransaction);
var table = new Table(CountersSchema, context.Transaction.InnerTransaction);

while (true)
{
bool hasMore = false;

foreach (var (_, tvh) in table.SeekByPrimaryKeyPrefix(Slices.BeforeAllKeys, startAfter: startAfterSliceHolder?.GetStartAfterSlice(context) ?? Slices.Empty, skip: 0))
{
_cancellationToken.ThrowIfCancellationRequested();

hasMore = true;

using (var docId = CountersStorage.ExtractDocId(context, ref tvh.Reader))
using (var docId = ExtractDocId(context, ref tvh.Reader))
{
if (docId != lastDocId)
{
Expand All @@ -66,10 +76,10 @@ public async Task Start(string lastProcessedKey)
}
}

using (var data = CountersStorage.GetCounterValuesData(context, ref tvh.Reader))
using (var data = GetCounterValuesData(context, ref tvh.Reader))
{
data.TryGet(CountersStorage.Values, out BlittableJsonReaderObject counterValues);
data.TryGet(CountersStorage.CounterNames, out BlittableJsonReaderObject counterNames);
data.TryGet(Values, out BlittableJsonReaderObject counterValues);
data.TryGet(CounterNames, out BlittableJsonReaderObject counterNames);

if (counterValues.Count == counterNames.Count)
{
Expand Down Expand Up @@ -105,7 +115,6 @@ public async Task Start(string lastProcessedKey)
startAfterSliceHolder?.Dispose();
return;
}

}
}
}
Expand Down Expand Up @@ -133,6 +142,136 @@ private void MarkAsCompleted()
}
}

internal int FixCountersForDocuments(DocumentsOperationContext context, List<string> docIds, bool hasMore)
{
var numOfCounterGroupsFixed = 0;
foreach (var docId in docIds)
{
numOfCounterGroupsFixed += FixCountersForDocument(context, docId);
}

var lastProcessedKey = hasMore ? docIds[^1] : Completed;
_database.DocumentsStorage.SetLastFixedCounterKey(context, lastProcessedKey);

return numOfCounterGroupsFixed;
}

internal unsafe int FixCountersForDocument(DocumentsOperationContext context, string documentId)
{
List<string> allNames = null;
LazyStringValue collection = default;

Table writeTable = default;
int numOfCounterGroupFixed = 0;
CollectionName collectionName = default;
try
{
var table = new Table(CountersSchema, context.Transaction.InnerTransaction);

using (DocumentIdWorker.GetSliceFromId(context, documentId, out Slice key, separator: SpecialChars.RecordSeparator))
{
foreach (var result in table.SeekByPrimaryKeyPrefix(key, Slices.Empty, 0))
{
var tvr = result.Value.Reader;
BlittableJsonReaderObject data;

using (data = GetCounterValuesData(context, ref tvr))
{
data = data.Clone(context);
}

data.TryGet(Values, out BlittableJsonReaderObject counterValues);
data.TryGet(CounterNames, out BlittableJsonReaderObject counterNames);

if (counterValues.Count == counterNames.Count)
{
var counterValuesPropertyNames = counterValues.GetSortedPropertyNames();
var counterNamesPropertyNames = counterNames.GetSortedPropertyNames();
if (counterValuesPropertyNames.SequenceEqual(counterNamesPropertyNames))
continue;
}

if (collection == null)
{
collection = TableValueToId(context, (int)CountersTable.Collection, ref tvr);
collectionName = _database.DocumentsStorage.ExtractCollectionName(context, collection);

writeTable = _database.DocumentsStorage.CountersStorage.GetOrCreateTable(context.Transaction.InnerTransaction, CountersSchema, collectionName, CollectionTableType.CounterGroups);
}

BlittableJsonReaderObject.PropertyDetails prop = default;
var originalNames = new DynamicJsonValue();

for (int i = 0; i < counterValues.Count; i++)
{
counterValues.GetPropertyByIndex(i, ref prop);

var lowerCasedCounterName = prop.Name;
if (counterNames.TryGet(lowerCasedCounterName, out string counterNameToUse) == false)
{
// CounterGroup document is corrupted - missing counter name
allNames ??= _database.DocumentsStorage.CountersStorage.GetCountersForDocument(context, documentId).ToList();
var location = allNames.BinarySearch(lowerCasedCounterName, StringComparer.OrdinalIgnoreCase);

// if we don't have the counter name in its original casing - we'll use the lowered-case name instead
counterNameToUse = location < 0
? lowerCasedCounterName
: allNames[location];
}

originalNames[lowerCasedCounterName] = counterNameToUse;
}

data.Modifications = new DynamicJsonValue(data)
{
[CounterNames] = originalNames
};

using (var old = data)
{
data = context.ReadObject(data, documentId, BlittableJsonDocumentBuilder.UsageMode.ToDisk);
}

// we're using the same change vector and etag here, in order to avoid replicating
// the counter group to other nodes (each node should fix its counters locally)
using var changeVector = TableValueToString(context, (int)CountersTable.ChangeVector, ref tvr);
var groupEtag = TableValueToEtag((int)CountersTable.Etag, ref tvr);

using (var counterGroupKey = TableValueToString(context, (int)CountersTable.CounterKey, ref tvr))
using (context.Allocator.Allocate(counterGroupKey.Size, out var buffer))
{
counterGroupKey.CopyTo(buffer.Ptr);

using (var clonedKey = context.AllocateStringValue(null, buffer.Ptr, buffer.Length))
using (Slice.External(context.Allocator, clonedKey, out var countersGroupKey))
using (Slice.From(context.Allocator, changeVector, out var cv))
using (DocumentIdWorker.GetStringPreserveCase(context, collectionName.Name, out Slice collectionSlice))
using (writeTable.Allocate(out TableValueBuilder tvb))
{
tvb.Add(countersGroupKey);
tvb.Add(Bits.SwapBytes(groupEtag));
tvb.Add(cv);
tvb.Add(data.BasePointer, data.Size);
tvb.Add(collectionSlice);
tvb.Add(context.GetTransactionMarker());

writeTable.Set(tvb);
}
}

numOfCounterGroupFixed++;
}
}
}
finally
{
collection?.Dispose();
}

return numOfCounterGroupFixed;

}

private class StartAfterSliceHolder : IDisposable
{
private readonly string _docId;
Expand Down Expand Up @@ -167,7 +306,7 @@ public unsafe Slice GetStartAfterSlice(DocumentsOperationContext context)
}
}

public class ExecuteFixCounterGroupsCommand : TransactionOperationsMerger.MergedTransactionCommand
private class ExecuteFixCounterGroupsCommand : TransactionOperationsMerger.MergedTransactionCommand
{
private readonly List<string> _docIds;
private readonly bool _hasMore;
Expand All @@ -182,7 +321,7 @@ public ExecuteFixCounterGroupsCommand(DocumentDatabase database, List<string> do

protected override long ExecuteCmd(DocumentsOperationContext context)
{
var numOfCounterGroupFixed = _database.DocumentsStorage.CountersStorage.FixCountersForDocuments(context, _docIds, _hasMore);
var numOfCounterGroupFixed = _database.CountersRepairTask.FixCountersForDocuments(context, _docIds, _hasMore);
return numOfCounterGroupFixed;
}

Expand Down
133 changes: 1 addition & 132 deletions src/Raven.Server/Documents/CountersStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using Raven.Client.Documents.Changes;
Expand Down Expand Up @@ -1161,136 +1160,6 @@ public bool PutCounters(DocumentsOperationContext context, string documentId, st
}
}

public int FixCountersForDocuments(DocumentsOperationContext context, List<string> docIds, bool hasMore)
{
var numOfCounterGroupsFixed = 0;
foreach (var docId in docIds)
{
numOfCounterGroupsFixed += FixCountersForDocument(context, docId);
}

var lastProcessedKey = hasMore ? docIds[^1] : CountersRepairTask.Completed;
_documentsStorage.SetLastFixedCounterKey(context, lastProcessedKey);

return numOfCounterGroupsFixed;
}

public int FixCountersForDocument(DocumentsOperationContext context, string documentId)
{
List<string> allNames = null;
LazyStringValue collection = default;

Table writeTable = default;
int numOfCounterGroupFixed = 0;
CollectionName collectionName = default;
try
{
var table = new Table(CountersSchema, context.Transaction.InnerTransaction);

using (DocumentIdWorker.GetSliceFromId(context, documentId, out Slice key, separator: SpecialChars.RecordSeparator))
{
foreach (var result in table.SeekByPrimaryKeyPrefix(key, Slices.Empty, 0))
{
var tvr = result.Value.Reader;
BlittableJsonReaderObject data;

using (data = GetCounterValuesData(context, ref tvr))
{
data = data.Clone(context);
}

data.TryGet(Values, out BlittableJsonReaderObject counterValues);
data.TryGet(CounterNames, out BlittableJsonReaderObject counterNames);

if (counterValues.Count == counterNames.Count)
{
var counterValuesPropertyNames = counterValues.GetSortedPropertyNames();
var counterNamesPropertyNames = counterNames.GetSortedPropertyNames();
if (counterValuesPropertyNames.SequenceEqual(counterNamesPropertyNames))
continue;
}

if (collection == null)
{
collection = TableValueToId(context, (int)CountersTable.Collection, ref tvr);
collectionName = _documentsStorage.ExtractCollectionName(context, collection);

writeTable = GetOrCreateTable(context.Transaction.InnerTransaction, CountersSchema, collectionName, CollectionTableType.CounterGroups);
}

BlittableJsonReaderObject.PropertyDetails prop = default;
var originalNames = new DynamicJsonValue();

for (int i = 0; i < counterValues.Count; i++)
{
counterValues.GetPropertyByIndex(i, ref prop);

var lowerCasedCounterName = prop.Name;
if (counterNames.TryGet(lowerCasedCounterName, out string counterNameToUse) == false)
{
// CounterGroup document is corrupted - missing counter name
allNames ??= GetCountersForDocument(context, documentId).ToList();
var location = allNames.BinarySearch(lowerCasedCounterName, StringComparer.OrdinalIgnoreCase);

// if we don't have the counter name in its original casing - we'll use the lowered-case name instead
counterNameToUse = location < 0
? lowerCasedCounterName
: allNames[location];
}

originalNames[lowerCasedCounterName] = counterNameToUse;
}

data.Modifications = new DynamicJsonValue(data)
{
[CounterNames] = originalNames
};

using (var old = data)
{
data = context.ReadObject(data, documentId, BlittableJsonDocumentBuilder.UsageMode.ToDisk);
}

// we're using the same change vector and etag here, in order to avoid replicating
// the counter group to other nodes (each node should fix its counters locally)
using var changeVector = TableValueToString(context, (int)CountersTable.ChangeVector, ref tvr);
var groupEtag = TableValueToEtag((int)CountersTable.Etag, ref tvr);

using (var counterGroupKey = TableValueToString(context, (int)CountersTable.CounterKey, ref tvr))
using (context.Allocator.Allocate(counterGroupKey.Size, out var buffer))
{
counterGroupKey.CopyTo(buffer.Ptr);

using (var clonedKey = context.AllocateStringValue(null, buffer.Ptr, buffer.Length))
using (Slice.External(context.Allocator, clonedKey, out var countersGroupKey))
using (Slice.From(context.Allocator, changeVector, out var cv))
using (DocumentIdWorker.GetStringPreserveCase(context, collectionName.Name, out Slice collectionSlice))
using (writeTable.Allocate(out TableValueBuilder tvb))
{
tvb.Add(countersGroupKey);
tvb.Add(Bits.SwapBytes(groupEtag));
tvb.Add(cv);
tvb.Add(data.BasePointer, data.Size);
tvb.Add(collectionSlice);
tvb.Add(context.GetTransactionMarker());

writeTable.Set(tvb);
}
}

numOfCounterGroupFixed++;
}
}
}
finally
{
collection?.Dispose();
}

return numOfCounterGroupFixed;

}

private void UpdateMetricsForNewCounterGroup(BlittableJsonReaderObject data)
{
_documentDatabase.Metrics.Counters.BytesPutsPerSec.MarkSingleThreaded(data.Size);
Expand Down Expand Up @@ -2045,7 +1914,7 @@ private Table GetOrCreateCounterTombstonesTable(Transaction tx, CollectionName c
return GetOrCreateTable(tx, CounterTombstonesSchema, collection, CollectionTableType.CounterTombstones);
}

private Table GetOrCreateTable(Transaction tx, TableSchema tableSchema, CollectionName collection, CollectionTableType type)
internal Table GetOrCreateTable(Transaction tx, TableSchema tableSchema, CollectionName collection, CollectionTableType type)
{
string tableName = collection.GetTableName(type);

Expand Down
Loading

0 comments on commit a83768c

Please sign in to comment.