From 6761ca97be207409c1e5c635959da6176f71b653 Mon Sep 17 00:00:00 2001 From: Ivan Zlenko <241953+ivanzlenko@users.noreply.github.com> Date: Wed, 12 Feb 2025 12:17:27 +0500 Subject: [PATCH] IGNITE-24446 Implement utilities necessary for zone assignments calculation (#5200) --- .../rebalance/ZoneRebalanceUtil.java | 31 ++ .../RebalanceUtilUpdateAssignmentsTest.java | 335 +++------------ ...oneRebalanceUtilUpdateAssignmentsTest.java | 381 ++++++++++++++++++ 3 files changed, 465 insertions(+), 282 deletions(-) create mode 100644 modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtilUpdateAssignmentsTest.java diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java index b0777e3e777..d702d1931a0 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java @@ -659,4 +659,35 @@ private static CompletableFuture> zoneAssignments( return numberOfMsPartitions == 0 ? Map.of() : result; }); } + + /** + * Returns zone pending assignments for all zone partitions from meta storage locally. + * + * @param metaStorageManager Meta storage manager. + * @param zoneId Zone id. + * @param numberOfPartitions Number of partitions. + * @param revision Revision. + * @return Future with zone assignments as a value. + */ + public static List zonePendingAssignmentsGetLocally( + MetaStorageManager metaStorageManager, + int zoneId, + int numberOfPartitions, + long revision + ) { + return IntStream.range(0, numberOfPartitions) + .mapToObj(partitionId -> getLocalAssignments(metaStorageManager, zoneId, partitionId, revision)) + .collect(toList()); + } + + private static @Nullable Assignments getLocalAssignments( + MetaStorageManager metaStorageManager, + int zoneId, + int partitionId, + long revision + ) { + Entry entry = metaStorageManager.getLocally(pendingPartAssignmentsKey(new ZonePartitionId(zoneId, partitionId)), revision); + + return entry != null ? Assignments.fromBytes(entry.value()) : null; + } } diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java index 1ab32297984..f6df8cb16a3 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java @@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; @@ -42,6 +43,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode; @@ -77,8 +79,12 @@ import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -95,8 +101,7 @@ public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest { private SimpleInMemoryKeyValueStorage keyValueStorage; - private ClusterService clusterService; - + @Mock private MetaStorageManager metaStorageManager; private final CatalogTableDescriptor tableDescriptor = new CatalogTableDescriptor( @@ -132,9 +137,7 @@ public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest { @BeforeEach public void setUp() { - clusterService = mock(ClusterService.class); - - metaStorageManager = mock(MetaStorageManager.class); + ClusterService clusterService = mock(ClusterService.class); AtomicLong raftIndex = new AtomicLong(); @@ -208,11 +211,7 @@ public void result(@Nullable Serializable r) { return metaStorageService.run(multiInvokeCommand); }).when(metaStorageManager).invoke(any()); - when(clusterService.messagingService()).thenAnswer(invocation -> { - MessagingService ret = mock(MessagingService.class); - - return ret; - }); + when(clusterService.messagingService()).thenAnswer(invocation -> mock(MessagingService.class)); assignmentsTimestamp = clock.now().longValue(); } @@ -222,277 +221,50 @@ public void tearDown() { keyValueStorage.close(); } - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments2. - * Current assignments in the metastorage: stable=null, pending=null, planned=null. - * Expected assignments in the metastorage after updating: stable=null, pending=assignments1, planned=null. - */ - @Test - void test1() { - test( - nodes1, assignments2, - null, null, null, - null, assignments1, null, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments1. - * Current assignments in the metastorage: stable=null, pending=null, planned=null. - * Expected assignments in the metastorage after updating: stable=null, pending=null, planned=null. - */ - @Test - void test2() { - test( - nodes1, assignments1, - null, null, null, - null, null, null, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments2. - * Current assignments in the metastorage: stable=null, pending=assignments3, planned=null. - * Expected assignments in the metastorage after updating: stable=null, pending=assignments3, planned=assignments1. - */ - @Test - void test3() { - test( - nodes1, assignments2, - null, assignments3, null, - null, assignments3, assignments1, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments1. - * Current assignments in the metastorage: stable=null, pending=assignments3, planned=null. - * Expected assignments in the metastorage after updating: stable=null, pending=assignments3, planned=assignments1. - */ - @Test - void test4() { - test( - nodes1, assignments1, - null, assignments3, null, - null, assignments3, assignments1, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments2. - * Current assignments in the metastorage: stable=assignments3, pending=null, planned=null. - * Expected assignments in the metastorage after updating: stable=assignments3, pending=assignments1, planned=null. - */ - @Test - void test5() { - test( - nodes1, assignments2, - assignments3, null, null, - assignments3, assignments1, null, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments1. - * Current assignments in the metastorage: stable=assignments3, pending=null, planned=null. - * Expected assignments in the metastorage after updating: stable=assignments3, pending=assignments1, planned=null. - */ - @Test - void test6() { - test( - nodes1, assignments1, - assignments3, null, null, - assignments3, assignments1, null, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments2. - * Current assignments in the metastorage: stable=assignments1, pending=null, planned=null. - * Expected assignments in the metastorage after updating: stable=assignments1, pending=null, planned=null. - */ - @Test - void test7() { - test( - nodes1, assignments2, - assignments1, null, null, - assignments1, null, null, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments1. - * Current assignments in the metastorage: stable=assignments1, pending=null, planned=null. - * Expected assignments in the metastorage after updating: stable=assignments1, pending=null, planned=null. - */ - @Test - void test8() { - test( - nodes1, assignments1, - assignments1, null, null, - assignments1, null, null, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments2. - * Current assignments in the metastorage: stable=assignments2, pending=null, planned=null. - * Expected assignments in the metastorage after updating: stable=assignments2, pending=assignments1, planned=null. - */ - @Test - void test9() { - test( - nodes1, assignments2, - assignments2, null, null, - assignments2, assignments1, null, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments2. - * Current assignments in the metastorage: stable=assignments4, pending=assignments3, planned=null. - * Expected assignments in the metastorage after updating: stable=assignments4, pending=assignments3, planned=assignments1. - */ - @Test - void test10() { - test( - nodes1, assignments2, - assignments4, assignments3, null, - assignments4, assignments3, assignments1, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments1. - * Current assignments in the metastorage: stable=assignments3, pending=assignments2, planned=null. - * Expected assignments in the metastorage after updating: stable=assignments3, pending=assignments2, planned=assignments1. - */ - @Test - void test11() { - test( - nodes1, assignments1, - assignments3, assignments2, null, - assignments3, assignments2, assignments1, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments2. - * Current assignments in the metastorage: stable=assignments1, pending=assignments3, planned=null. - * Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments3, planned=assignments1. - */ - @Test - void test12() { - test( - nodes1, assignments2, - assignments1, assignments3, null, - assignments1, assignments3, assignments1, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments2. - * Current assignments in the metastorage: stable=assignments2, pending=assignments3, planned=null. - * Expected assignments in the metastorage after updating: stable=assignments2, pending=assignments3, planned=assignments1. - */ - @Test - void test13() { - test( - nodes1, assignments2, - assignments2, assignments3, null, - assignments2, assignments3, assignments1, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments1. - * Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=null. - * Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=assignments1. - */ - @Test - void test14() { - test( - nodes1, assignments1, - assignments1, assignments2, null, - assignments1, assignments2, assignments1, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments1. - * Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=assignments3. - * Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=assignments1. - */ - @Test - void test15() { - test( - nodes1, assignments1, - assignments1, assignments2, assignments3, - assignments1, assignments2, assignments1, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes1. - * The table configuration assignments: assignments4. - * Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=assignments1. - * Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=assignments1. - */ - @Test - void test16() { - test( - nodes1, assignments4, - assignments1, assignments2, assignments1, - assignments1, assignments2, assignments1, assignmentsTimestamp + private static Stream assignmentsProvider() { + return Stream.of( + arguments(nodes1, assignments2, null, null, null, null, assignments1, null), + arguments(nodes1, assignments1, null, null, null, null, null, null), + arguments(nodes1, assignments2, null, assignments3, null, null, assignments3, assignments1), + arguments(nodes1, assignments1, null, assignments3, null, null, assignments3, assignments1), + arguments(nodes1, assignments2, assignments3, null, null, assignments3, assignments1, null), + arguments(nodes1, assignments1, assignments3, null, null, assignments3, assignments1, null), + arguments(nodes1, assignments2, assignments1, null, null, assignments1, null, null), + arguments(nodes1, assignments1, assignments1, null, null, assignments1, null, null), + arguments(nodes1, assignments2, assignments2, null, null, assignments2, assignments1, null), + arguments(nodes1, assignments2, assignments4, assignments3, null, assignments4, assignments3, assignments1), + arguments(nodes1, assignments1, assignments3, assignments2, null, assignments3, assignments2, assignments1), + arguments(nodes1, assignments2, assignments1, assignments3, null, assignments1, assignments3, assignments1), + arguments(nodes1, assignments2, assignments2, assignments3, null, assignments2, assignments3, assignments1), + arguments(nodes1, assignments1, assignments1, assignments2, null, assignments1, assignments2, assignments1), + arguments(nodes1, assignments1, assignments1, assignments2, assignments3, assignments1, assignments2, assignments1), + arguments(nodes1, assignments4, assignments1, assignments2, assignments1, assignments1, assignments2, assignments1), + arguments(nodes2, assignments2, assignments1, assignments2, assignments1, assignments1, assignments2, null), + arguments(nodes2, assignments4, assignments1, assignments2, assignments1, assignments1, assignments2, null) ); } /** - * Nodes for new assignments calculating: nodes2. - * The table configuration assignments: assignments2. - * Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=assignments1. - * Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=null. + * Verifies that the metastorage has correct assignments after invoking {@link RebalanceUtil#updatePendingAssignmentsKeys}. + * Uses {@link #assignmentsProvider()} as the parameter source. + * + * @param nodesForNewAssignments Nodes list to calculate new assignments against. + * @param tableCfgAssignments Table's assignment set from the stable configuration. + * @param currentStableAssignments Stable assignments already existing in the metastorage. + * @param currentPendingAssignments Pending assignments already existing in the metastorage. + * @param currentPlannedAssignments Planned assignments already existing in the metastorage. + * @param expectedStableAssignments Stable assignments expected in the metastorage + * after invoking {@link RebalanceUtil#updatePendingAssignmentsKeys}. + * @param expectedPendingAssignments Pending assignments expected in the metastorage + * after invoking {@link RebalanceUtil#updatePendingAssignmentsKeys}. + * @param expectedPlannedAssignments Planned assignments expected in the metastorage + * after invoking {@link RebalanceUtil#updatePendingAssignmentsKeys}. */ - @Test - void test17() { - test( - nodes2, assignments2, - assignments1, assignments2, assignments1, - assignments1, assignments2, null, assignmentsTimestamp - ); - } - - /** - * Nodes for new assignments calculating: nodes2. - * The table configuration assignments: assignments4. - * Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=assignments1. - * Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=null. - */ - @Test - void test18() { - test( - nodes2, assignments4, - assignments1, assignments2, assignments1, - assignments1, assignments2, null, assignmentsTimestamp - ); - } - - private void test( + @DisplayName("Verify that assignments can be updated in metastorage") + @MethodSource("assignmentsProvider") + @ParameterizedTest(name = "[{index}] new nodes: {0}; stable configuration: {1}; assignments in metastorage: [{2}, {3}, {4}];" + + " expected assignments after update: [{5}, {6}, {7}]") + void testAssignmentsUpdate( Collection nodesForNewAssignments, Set tableCfgAssignments, Set currentStableAssignments, @@ -500,8 +272,7 @@ private void test( Set currentPlannedAssignments, Set expectedStableAssignments, Set expectedPendingAssignments, - Set expectedPlannedAssignments, - long assignmentsTimestamp + Set expectedPlannedAssignments ) { TablePartitionId tablePartitionId = new TablePartitionId(1, 1); @@ -574,9 +345,9 @@ private void test( byte[] pendingChangeTriggerKey = keyValueStorage.get(RebalanceUtil.pendingChangeTriggerKey(tablePartitionId).bytes()).value(); long actualPendingChangeTrigger = bytesToLongKeepingOrder(pendingChangeTriggerKey); - LOG.info("stableAssignments " + actualStableAssignments); - LOG.info("pendingAssignments " + actualPendingAssignments); - LOG.info("plannedAssignments " + actualPlannedAssignments); + LOG.info("stableAssignments {}", actualStableAssignments); + LOG.info("pendingAssignments {}", actualPendingAssignments); + LOG.info("plannedAssignments {}", actualPlannedAssignments); if (expectedStableAssignments != null) { assertNotNull(actualStableBytes); diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtilUpdateAssignmentsTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtilUpdateAssignmentsTest.java new file mode 100644 index 00000000000..7f7d6330c36 --- /dev/null +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtilUpdateAssignmentsTest.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.distributionzones.rebalance; + +import static java.util.stream.Collectors.toSet; +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; +import static org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext.kvContext; +import static org.apache.ignite.internal.partitiondistribution.Assignments.toBytes; +import static org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition; +import static org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder; +import static org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfilesDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory; +import org.apache.ignite.internal.metastorage.command.MetaStorageWriteCommand; +import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand; +import org.apache.ignite.internal.metastorage.dsl.Iif; +import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator; +import org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext; +import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; +import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; +import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; +import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.network.MessagingService; +import org.apache.ignite.internal.partitiondistribution.Assignment; +import org.apache.ignite.internal.partitiondistribution.Assignments; +import org.apache.ignite.internal.raft.Command; +import org.apache.ignite.internal.raft.WriteCommand; +import org.apache.ignite.internal.raft.service.CommandClosure; +import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +/** + * Tests for updating assignment in the meta storage. + */ +@ExtendWith({MockitoExtension.class, ConfigurationExtension.class}) +@MockitoSettings(strictness = Strictness.LENIENT) +public class ZoneRebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest { + private static final IgniteLogger LOG = Loggers.forClass(ZoneRebalanceUtilUpdateAssignmentsTest.class); + + private static final KeyValueUpdateContext KV_UPDATE_CONTEXT = kvContext(HybridTimestamp.MIN_VALUE); + + private SimpleInMemoryKeyValueStorage keyValueStorage; + + @Mock + private MetaStorageManager metaStorageManager; + + private final CatalogStorageProfileDescriptor storageProfileDescriptor = new CatalogStorageProfileDescriptor("default"); + + private final CatalogStorageProfilesDescriptor storageProfilesDescriptor = new CatalogStorageProfilesDescriptor( + List.of(storageProfileDescriptor) + ); + + private static final int partNum = 2; + private static final int replicas = 2; + + private final CatalogZoneDescriptor zoneDescriptor = new CatalogZoneDescriptor( + 1, + "zone1", + partNum, + replicas, + 1000, + 1000, + 1000, + "", + storageProfilesDescriptor, + ConsistencyMode.STRONG_CONSISTENCY + ); + + private final HybridClock clock = new HybridClockImpl(); + + private static final Set nodes1 = IntStream.of(5).mapToObj(i -> "nodes1_" + i).collect(toSet()); + private static final Set nodes2 = IntStream.of(5).mapToObj(i -> "nodes2_" + i).collect(toSet()); + private static final Set nodes3 = IntStream.of(5).mapToObj(i -> "nodes3_" + i).collect(toSet()); + private static final Set nodes4 = IntStream.of(5).mapToObj(i -> "nodes4_" + i).collect(toSet()); + + private static final Set assignments1 = calculateAssignmentForPartition(nodes1, partNum, partNum + 1, replicas); + private static final Set assignments2 = calculateAssignmentForPartition(nodes2, partNum, partNum + 1, replicas); + private static final Set assignments3 = calculateAssignmentForPartition(nodes3, partNum, partNum + 1, replicas); + private static final Set assignments4 = calculateAssignmentForPartition(nodes4, partNum, partNum + 1, replicas); + + private static final long expectedPendingChangeTriggerKey = 10L; + + private long assignmentsTimestamp; + + @BeforeEach + public void setUp() { + ClusterService clusterService = mock(ClusterService.class); + + AtomicLong raftIndex = new AtomicLong(); + + String nodeName = "test"; + + keyValueStorage = spy(new SimpleInMemoryKeyValueStorage(nodeName)); + + ClusterTimeImpl clusterTime = new ClusterTimeImpl(nodeName, new IgniteSpinBusyLock(), clock); + + MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage, clock, clusterTime); + + RaftGroupService metaStorageService = mock(RaftGroupService.class); + + // Delegate directly to listener. + lenient().doAnswer( + invocationClose -> { + Command cmd = invocationClose.getArgument(0); + + long commandIndex = raftIndex.incrementAndGet(); + + if (cmd instanceof MetaStorageWriteCommand) { + ((MetaStorageWriteCommand) cmd).safeTime(hybridTimestamp(10)); + } + + CompletableFuture res = new CompletableFuture<>(); + + CommandClosure clo = new CommandClosure<>() { + @Override + public long index() { + return commandIndex; + } + + @Override + public WriteCommand command() { + return (WriteCommand) cmd; + } + + @Override + public void result(@Nullable Serializable r) { + if (r instanceof Throwable) { + res.completeExceptionally((Throwable) r); + } else { + res.complete(r); + } + } + }; + + try { + metaStorageListener.onWrite(List.of(clo).iterator()); + } catch (Throwable e) { + res.completeExceptionally(new IgniteInternalException(e)); + } + + return res; + } + ).when(metaStorageService).run(any()); + + MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory(); + + CommandIdGenerator commandIdGenerator = new CommandIdGenerator(UUID.randomUUID()); + + lenient().doAnswer(invocationClose -> { + Iif iif = invocationClose.getArgument(0); + + MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand() + .iif(iif) + .id(commandIdGenerator.newId()) + .initiatorTime(clock.now()) + .build(); + + return metaStorageService.run(multiInvokeCommand); + }).when(metaStorageManager).invoke(any()); + + when(clusterService.messagingService()).thenAnswer(invocation -> mock(MessagingService.class)); + + assignmentsTimestamp = clock.now().longValue(); + } + + @AfterEach + public void tearDown() { + keyValueStorage.close(); + } + + private static Stream assignmentsProvider() { + return Stream.of( + arguments(nodes1, assignments2, null, null, null, null, assignments1, null), + arguments(nodes1, assignments1, null, null, null, null, null, null), + arguments(nodes1, assignments2, null, assignments3, null, null, assignments3, assignments1), + arguments(nodes1, assignments1, null, assignments3, null, null, assignments3, assignments1), + arguments(nodes1, assignments2, assignments3, null, null, assignments3, assignments1, null), + arguments(nodes1, assignments1, assignments3, null, null, assignments3, assignments1, null), + arguments(nodes1, assignments2, assignments1, null, null, assignments1, null, null), + arguments(nodes1, assignments1, assignments1, null, null, assignments1, null, null), + arguments(nodes1, assignments2, assignments2, null, null, assignments2, assignments1, null), + arguments(nodes1, assignments2, assignments4, assignments3, null, assignments4, assignments3, assignments1), + arguments(nodes1, assignments1, assignments3, assignments2, null, assignments3, assignments2, assignments1), + arguments(nodes1, assignments2, assignments1, assignments3, null, assignments1, assignments3, assignments1), + arguments(nodes1, assignments2, assignments2, assignments3, null, assignments2, assignments3, assignments1), + arguments(nodes1, assignments1, assignments1, assignments2, null, assignments1, assignments2, assignments1), + arguments(nodes1, assignments1, assignments1, assignments2, assignments3, assignments1, assignments2, assignments1), + arguments(nodes1, assignments4, assignments1, assignments2, assignments1, assignments1, assignments2, assignments1), + arguments(nodes2, assignments2, assignments1, assignments2, assignments1, assignments1, assignments2, null), + arguments(nodes2, assignments4, assignments1, assignments2, assignments1, assignments1, assignments2, null) + ); + } + + /** + * Verifies that the metastorage has correct assignments after invoking {@link ZoneRebalanceUtil#updatePendingAssignmentsKeys}. + * Uses {@link #assignmentsProvider()} as the parameter source. + * + * @param nodesForNewAssignments Nodes list to calculate new assignments against. + * @param zoneCfgAssignments Zone's assignment set from the stable configuration. + * @param currentStableAssignments Stable assignments already existing in the metastorage. + * @param currentPendingAssignments Pending assignments already existing in the metastorage. + * @param currentPlannedAssignments Planned assignments already existing in the metastorage. + * @param expectedStableAssignments Stable assignments expected in the metastorage + * after invoking {@link ZoneRebalanceUtil#updatePendingAssignmentsKeys}. + * @param expectedPendingAssignments Pending assignments expected in the metastorage + * after invoking {@link ZoneRebalanceUtil#updatePendingAssignmentsKeys}. + * @param expectedPlannedAssignments Planned assignments expected in the metastorage + * after invoking {@link ZoneRebalanceUtil#updatePendingAssignmentsKeys}. + */ + @DisplayName("Verify that assignments can be updated in metastorage") + @MethodSource("assignmentsProvider") + @ParameterizedTest(name = "[{index}] new nodes: {0}; stable configuration: {1}; assignments in metastorage: [{2}, {3}, {4}];" + + " expected assignments after update: [{5}, {6}, {7}]") + void testAssignmentsUpdate( + Collection nodesForNewAssignments, + Set zoneCfgAssignments, + Set currentStableAssignments, + Set currentPendingAssignments, + Set currentPlannedAssignments, + Set expectedStableAssignments, + Set expectedPendingAssignments, + Set expectedPlannedAssignments + ) { + ZonePartitionId zonePartitionId = new ZonePartitionId(1, 1); + + if (currentStableAssignments != null) { + keyValueStorage.put( + ZoneRebalanceUtil.stablePartAssignmentsKey(zonePartitionId).bytes(), + toBytes(currentStableAssignments, assignmentsTimestamp), + KV_UPDATE_CONTEXT + ); + } + + if (currentPendingAssignments != null) { + keyValueStorage.put( + ZoneRebalanceUtil.pendingPartAssignmentsKey(zonePartitionId).bytes(), + toBytes(currentPendingAssignments, assignmentsTimestamp), + KV_UPDATE_CONTEXT + ); + } + + if (currentPlannedAssignments != null) { + keyValueStorage.put( + ZoneRebalanceUtil.plannedPartAssignmentsKey(zonePartitionId).bytes(), + toBytes(currentPlannedAssignments, assignmentsTimestamp), + KV_UPDATE_CONTEXT + ); + } + + keyValueStorage.put( + ZoneRebalanceUtil.pendingChangeTriggerKey(zonePartitionId).bytes(), + longToBytesKeepingOrder(1), + KV_UPDATE_CONTEXT + ); + + ZoneRebalanceUtil.updatePendingAssignmentsKeys( + zoneDescriptor, + zonePartitionId, + nodesForNewAssignments, + partNum + 1, + replicas, + expectedPendingChangeTriggerKey, + metaStorageManager, + partNum, + zoneCfgAssignments, + assignmentsTimestamp, + Set.of(), + ConsistencyMode.STRONG_CONSISTENCY + ); + + byte[] actualStableBytes = keyValueStorage.get(ZoneRebalanceUtil.stablePartAssignmentsKey(zonePartitionId).bytes()).value(); + Set actualStableAssignments = null; + + if (actualStableBytes != null) { + actualStableAssignments = Assignments.fromBytes(actualStableBytes).nodes(); + } + + byte[] actualPendingBytes = keyValueStorage.get(ZoneRebalanceUtil.pendingPartAssignmentsKey(zonePartitionId).bytes()).value(); + Set actualPendingAssignments = null; + + if (actualPendingBytes != null) { + actualPendingAssignments = Assignments.fromBytes(actualPendingBytes).nodes(); + } + + byte[] actualPlannedBytes = keyValueStorage.get(ZoneRebalanceUtil.plannedPartAssignmentsKey(zonePartitionId).bytes()).value(); + Set actualPlannedAssignments = null; + + if (actualPlannedBytes != null) { + actualPlannedAssignments = Assignments.fromBytes(actualPlannedBytes).nodes(); + } + + byte[] pendingChangeTriggerKey = keyValueStorage.get(ZoneRebalanceUtil.pendingChangeTriggerKey(zonePartitionId).bytes()).value(); + long actualPendingChangeTrigger = bytesToLongKeepingOrder(pendingChangeTriggerKey); + + LOG.info("stableAssignments {}", actualStableAssignments); + LOG.info("pendingAssignments {}", actualPendingAssignments); + LOG.info("plannedAssignments {}", actualPlannedAssignments); + + if (expectedStableAssignments != null) { + assertNotNull(actualStableBytes); + assertEquals(actualStableAssignments, expectedStableAssignments); + } else { + assertNull(actualStableBytes); + } + + if (expectedPendingAssignments != null) { + assertNotNull(actualPendingBytes); + assertEquals(actualPendingAssignments, expectedPendingAssignments); + } else { + assertNull(actualPendingBytes); + } + + if (expectedPlannedAssignments != null) { + assertNotNull(actualPlannedBytes); + assertEquals(actualPlannedAssignments, expectedPlannedAssignments); + } else { + assertNull(actualPlannedBytes); + } + + assertEquals(expectedPendingChangeTriggerKey, actualPendingChangeTrigger); + } +}