Skip to content

Commit

Permalink
[IGNITE-24413] More HA tests (#5193)
Browse files Browse the repository at this point in the history
  • Loading branch information
Cyrill authored Feb 12, 2025
1 parent 6761ca9 commit ffb8449
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartitionAssignments;
import static org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
Expand Down Expand Up @@ -1056,7 +1056,7 @@ private static Set<Assignment> getPartitionClusterNodes(Node node, int partNum)

private static Set<Assignment> getPartitionClusterNodes(Node node, String tableName, int partNum) {
return Optional.ofNullable(getTableId(node, tableName))
.map(tableId -> partitionAssignments(node.metaStorageManager, tableId, partNum).join())
.map(tableId -> stablePartitionAssignments(node.metaStorageManager, tableId, partNum).join())
.orElse(Set.of());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartitionAssignments;
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
Expand Down Expand Up @@ -168,7 +168,7 @@ private void waitForStableAssignmentsInMetastore(Set<String> expectedNodes, int

assertTrue(waitForCondition(() -> {
Set<String> assignments =
await(partitionAssignments(unwrapIgniteImpl(cluster.aliveNode()).metaStorageManager(), table, 0))
await(stablePartitionAssignments(unwrapIgniteImpl(cluster.aliveNode()).metaStorageManager(), table, 0))
.stream()
.map(Assignment::consistentId)
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ public static <T> Set<T> intersect(Set<T> op1, Set<T> op2) {
* @param partitionId Partition ID.
* @return Future with partition assignments as a value.
*/
public static CompletableFuture<Set<Assignment>> partitionAssignments(
public static CompletableFuture<Set<Assignment>> stablePartitionAssignments(
MetaStorageManager metaStorageManager,
int tableId,
int partitionId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartitionAssignments;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
Expand Down Expand Up @@ -102,7 +102,7 @@ private static void waitForStableAssignmentsChangeInMetastore(
Set<Assignment>[] actualAssignmentsHolder = new Set[]{Set.of()};

assertTrue(waitForCondition(() -> {
CompletableFuture<Set<Assignment>> partitionAssignmentsFuture = partitionAssignments(
CompletableFuture<Set<Assignment>> partitionAssignmentsFuture = stablePartitionAssignments(
node.metaStorageManager(),
tableId,
partitionId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,30 @@
package org.apache.ignite.internal.table.distributed.disaster;

import static java.util.Collections.emptySet;
import static java.util.Map.of;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartitionAssignments;
import static org.apache.ignite.internal.table.TableTestUtils.getTableId;
import static org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.RECOVERY_TRIGGER_KEY;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand All @@ -44,6 +50,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -58,15 +65,30 @@
import org.apache.ignite.internal.distributionzones.Node;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionException;

/** Parent for tests of HA zones feature. */
public abstract class AbstractHighAvailablePartitionsRecoveryTest extends ClusterPerTestIntegrationTest {
Expand All @@ -82,6 +104,8 @@ public abstract class AbstractHighAvailablePartitionsRecoveryTest extends Cluste

private static final int PARTITIONS_NUMBER = 2;

private static final int ENTRIES = 2;

static Set<Integer> PARTITION_IDS = IntStream
.range(0, PARTITIONS_NUMBER)
.boxed()
Expand Down Expand Up @@ -233,7 +257,7 @@ private static Set<String> assignmentsFromEntry(Entry entry) {

private Set<Assignment> getPartitionClusterNodes(IgniteImpl node, String tableName, int partNum) {
return Optional.ofNullable(getTableId(node.catalogManager(), tableName, clock.nowLong()))
.map(tableId -> partitionAssignments(node.metaStorageManager(), tableId, partNum).join())
.map(tableId -> stablePartitionAssignments(node.metaStorageManager(), tableId, partNum).join())
.orElse(Set.of());
}

Expand Down Expand Up @@ -416,4 +440,70 @@ final Set<String> nodeNames(Integer... indexes) {
.map(i -> node(i).name())
.collect(Collectors.toUnmodifiableSet());
}

static List<Throwable> insertValues(Table table, int offset) {
KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();

List<Throwable> errors = new ArrayList<>();

for (int i = 0; i < ENTRIES; i++) {
Tuple key = Tuple.create(of("id", i));

CompletableFuture<Void> insertFuture = keyValueView.putAsync(null, key, Tuple.create(of("val", i + offset)));

try {
assertThat(insertFuture, willCompleteSuccessfully());

Tuple value = keyValueView.get(null, key);

assertNotNull(value);
} catch (Throwable e) {
Throwable cause = unwrapCause(e);

if (cause instanceof IgniteException && isPrimaryReplicaHasChangedException((IgniteException) cause)
|| cause instanceof TransactionException
|| cause instanceof TimeoutException
) {
errors.add(cause);
} else {
fail("Unexpected exception", e);
}
}
}

return errors;
}

void assertValuesPresentOnNodes(HybridTimestamp ts, Table table, Integer... indexes) {
for (Integer index : indexes) {
assertValuesPresentOnNode(table, ts, index);
}
}

private void assertValuesPresentOnNode(Table table, HybridTimestamp ts, int targetNodeIndex) {
IgniteImpl targetNode = unwrapIgniteImpl(node(targetNodeIndex));

TableImpl tableImpl = unwrapTableImpl(table);
InternalTable internalTable = tableImpl.internalTable();

for (int i = 0; i < ENTRIES; i++) {
CompletableFuture<BinaryRow> fut =
internalTable.get(marshalKey(tableImpl, Tuple.create(of("id", i))), ts, targetNode.node());
assertThat(fut, willCompleteSuccessfully());

assertNotNull(fut.join());
}
}

private static Row marshalKey(TableViewInternal table, Tuple key) {
SchemaRegistry schemaReg = table.schemaView();

var marshaller = new TupleMarshallerImpl(schemaReg.lastKnownSchema());

return marshaller.marshal(key, null);
}

private static boolean isPrimaryReplicaHasChangedException(IgniteException cause) {
return ExceptionUtils.extractCodeFrom(cause) == Replicator.REPLICA_MISS_ERR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.table.distributed.disaster;

import static java.lang.String.format;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
Expand All @@ -26,6 +27,8 @@
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -43,6 +46,7 @@
import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.table.Table;
import org.junit.jupiter.api.Test;

/** Test for the HA zones recovery. */
Expand Down Expand Up @@ -297,6 +301,64 @@ void testHaZoneScaleDownNodesDoNotRemovedFromStable() throws InterruptedExceptio
waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, allNodes);
}

@Test
void testRebalanceInHaZone() throws InterruptedException {
createHaZoneWithTable();

startNode(3);

IgniteImpl node = igniteImpl(0);
Table table = node.tables().table(HA_TABLE_NAME);

List<Throwable> errors = insertValues(table, 0);
assertThat(errors, is(empty()));

Set<String> fourNodes = runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet());

executeSql(format("ALTER ZONE %s SET REPLICAS=%d", HA_ZONE_NAME, 4));

waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, fourNodes);

assertValuesPresentOnNodes(node.clock().now(), table, 0, 1, 2, 3);

stopNode(3);

Set<String> threeNodes = runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet());

executeSql(format("ALTER ZONE %s SET data_nodes_auto_adjust_scale_down=%d", HA_ZONE_NAME, 1));

waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, threeNodes);

assertValuesPresentOnNodes(node.clock().now(), table, 0, 1, 2);
}

@Test
void testRestartNodesOneByOne() throws InterruptedException {
startNode(3);
startNode(4);

createHaZoneWithTable();

IgniteImpl node = igniteImpl(0);

Table table = node.tables().table(HA_TABLE_NAME);

List<Throwable> errors = insertValues(table, 0);
assertThat(errors, is(empty()));

for (int i = 0; i < 5; i++) {
restartNode(i);
}

Set<String> allNodes = runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet());

node = igniteImpl(0);

waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, PARTITION_IDS, allNodes);

assertValuesPresentOnNodes(node.clock().now(), node.tables().table(HA_TABLE_NAME), 0, 1, 2, 3, 4);
}

@Test
void testScaleDownTimerIsWorkingForHaZone() throws InterruptedException {
IgniteImpl node = igniteImpl(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@
import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
import org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum;
import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateByNode;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
Expand Down Expand Up @@ -1929,7 +1931,7 @@ private static List<Throwable> insertValues(Table table, int partitionId, int of
}

private static boolean isPrimaryReplicaHasChangedException(IgniteException cause) {
return cause.getMessage() != null && cause.getMessage().contains("The primary replica has changed");
return ExceptionUtils.extractCodeFrom(cause) == Replicator.REPLICA_MISS_ERR;
}

private void startNodesInParallel(int... nodeIndexes) {
Expand Down

0 comments on commit ffb8449

Please sign in to comment.