From d84f2007a58da48272450d5687dbb5ae76e66a1c Mon Sep 17 00:00:00 2001 From: aviv Date: Sun, 25 Aug 2024 15:02:21 +0300 Subject: [PATCH] RavenDB-22731 : fix failing test - handle finding matching prefix for bucket migration in ShardedDocumentsMigrator --- .../Background/ShardedDocumentsMigrator.cs | 35 ++++++++++++++++--- .../Sharding/StartBucketMigrationCommand.cs | 8 ++--- src/Raven.Server/ServerWide/ShardingStore.cs | 2 +- 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/Raven.Server/Documents/Sharding/Background/ShardedDocumentsMigrator.cs b/src/Raven.Server/Documents/Sharding/Background/ShardedDocumentsMigrator.cs index b4de849b33cb..18fc8d4c4f96 100644 --- a/src/Raven.Server/Documents/Sharding/Background/ShardedDocumentsMigrator.cs +++ b/src/Raven.Server/Documents/Sharding/Background/ShardedDocumentsMigrator.cs @@ -5,6 +5,7 @@ using Raven.Client.ServerWide.Sharding; using Raven.Server.ServerWide.Commands.Sharding; using Raven.Server.ServerWide.Context; +using Raven.Server.Utils; using Sparrow.Logging; using Sparrow.Utils; @@ -60,7 +61,7 @@ internal async Task ExecuteMoveDocumentsAsync() } if (bucket != -1) - await MoveDocumentsToShardAsync(bucket, moveToShard); + await MoveDocumentsToShardAsync(bucket, moveToShard, configuration); } catch (Exception e) { @@ -112,10 +113,36 @@ private bool TryFindWrongBucket(DocumentsOperationContext context, ShardingConfi return false; } - private async Task MoveDocumentsToShardAsync(int bucket, int moveToShard) + private async Task MoveDocumentsToShardAsync(int bucket, int moveToShard, ShardingConfiguration configuration) { - var cmd = new StartBucketMigrationCommand(bucket, _database.ShardNumber, moveToShard, _database.ShardedDatabaseName, - $"{Guid.NewGuid()}/{bucket}"); + string prefix = null; + if (bucket >= ShardHelper.NumberOfBuckets) + { + // bucket belongs to a prefixed range + // need to find the corresponding prefix setting in order to validate the destination shard + + foreach (var setting in configuration.Prefixed) + { + var bucketRangeStart = setting.BucketRangeStart; + var nextRangeStart = bucketRangeStart + ShardHelper.NumberOfBuckets; + + if (bucket < bucketRangeStart || bucket >= nextRangeStart) + continue; + + prefix = setting.Prefix; + break; + } + + if (string.IsNullOrEmpty(prefix)) + throw new InvalidOperationException($"Bucket {bucket} should belong to a prefixed range, but a corresponding {nameof(PrefixedShardingSetting)} wasn't found in database record"); + } + + var cmd = new StartBucketMigrationCommand(bucket, + sourceShard: _database.ShardNumber, + destShard: moveToShard, + _database.ShardedDatabaseName, + prefix, + raftId: $"{Guid.NewGuid()}/{bucket}"); await _database.ServerStore.SendToLeaderAsync(cmd); } diff --git a/src/Raven.Server/ServerWide/Commands/Sharding/StartBucketMigrationCommand.cs b/src/Raven.Server/ServerWide/Commands/Sharding/StartBucketMigrationCommand.cs index aa1debca9def..e7444b105068 100644 --- a/src/Raven.Server/ServerWide/Commands/Sharding/StartBucketMigrationCommand.cs +++ b/src/Raven.Server/ServerWide/Commands/Sharding/StartBucketMigrationCommand.cs @@ -28,21 +28,17 @@ public StartBucketMigrationCommand() { } - public StartBucketMigrationCommand(int bucket, int destShard, string database, string prefix, string raftId) : base(database, raftId) + public StartBucketMigrationCommand(int bucket, int? sourceShard, int destShard, string database, string prefix, string raftId) : base(database, raftId) { if (bucket >= ShardHelper.NumberOfBuckets && string.IsNullOrEmpty(prefix)) throw new InvalidOperationException($"Bucket {bucket} belongs to a prefixed range, but 'prefix' parameter wasn't provided"); + SourceShard = sourceShard; Bucket = bucket; DestinationShard = destShard; Prefix = prefix; } - public StartBucketMigrationCommand(int bucket, int sourceShard, int destShard, string database, string raftId) : this(bucket, destShard, database, prefix: null, raftId) - { - SourceShard = sourceShard; - } - public override void UpdateDatabaseRecord(DatabaseRecord record, long etag) { var sourceShard = SourceShard ?? ShardHelper.GetShardNumberFor(record.Sharding, Bucket); diff --git a/src/Raven.Server/ServerWide/ShardingStore.cs b/src/Raven.Server/ServerWide/ShardingStore.cs index 9f8b8fced56b..9a998ff7dfa7 100644 --- a/src/Raven.Server/ServerWide/ShardingStore.cs +++ b/src/Raven.Server/ServerWide/ShardingStore.cs @@ -35,7 +35,7 @@ public ShardingStore([NotNull] ServerStore serverStore) public Task<(long Index, object Result)> StartBucketMigration(string database, int bucket, int toShard, string prefix, string raftId) { - var cmd = new StartBucketMigrationCommand(bucket, toShard, database, prefix, raftId ?? RaftIdGenerator.NewId()); + var cmd = new StartBucketMigrationCommand(bucket, sourceShard: null, toShard, database, prefix, raftId ?? RaftIdGenerator.NewId()); return _serverStore.SendToLeaderAsync(cmd); }