Skip to content

Commit

Permalink
IGNITE-24472 Use ReplicationGroupId to represent enlisted partitions (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
rpuch authored Feb 12, 2025
1 parent ffb8449 commit 3c2cb8d
Show file tree
Hide file tree
Showing 45 changed files with 406 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
Expand Down Expand Up @@ -85,7 +86,7 @@ public UUID id() {
}

@Override
public IgniteBiTuple<ClusterNode, Long> enlistedNodeAndConsistencyToken(TablePartitionId tablePartitionId) {
public IgniteBiTuple<ClusterNode, Long> enlistedNodeAndConsistencyToken(ReplicationGroupId replicationGroupId) {
return null;
}

Expand All @@ -106,7 +107,8 @@ public TablePartitionId commitPartition() {

@Override
public IgniteBiTuple<ClusterNode, Long> enlist(
TablePartitionId tablePartitionId,
ReplicationGroupId replicationGroupId,
int tableId,
IgniteBiTuple<ClusterNode, Long> nodeAndConsistencyToken) {
return null;
}
Expand Down Expand Up @@ -203,7 +205,8 @@ public CompletableFuture<Void> finish(
HybridTimestampTracker timestampTracker,
TablePartitionId commitPartition,
boolean commit,
Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups,
Map<ReplicationGroupId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups,
Set<Integer> enlistedTableIds,
UUID txId
) {
return nullCompletedFuture();
Expand All @@ -212,7 +215,7 @@ public CompletableFuture<Void> finish(
@Override
public CompletableFuture<Void> cleanup(
ReplicationGroupId commitPartitionId,
Map<TablePartitionId, String> enlistedPartitions,
Map<ReplicationGroupId, String> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
Expand All @@ -223,7 +226,7 @@ public CompletableFuture<Void> cleanup(
@Override
public CompletableFuture<Void> cleanup(
TablePartitionId commitPartitionId,
Collection<TablePartitionId> enlistedPartitions,
Collection<ReplicationGroupId> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ void testLocalRaftLogReapplication() throws Exception {
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@ValueSource(booleans = {false, true})
void txFinishCommandGetsReplicated(boolean commit) throws Exception {
startCluster(3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
Expand Down Expand Up @@ -47,9 +47,8 @@
import org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException;
import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
Expand Down Expand Up @@ -122,14 +121,14 @@ public TxFinishReplicaRequestHandler(
*/
public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest request) {
// TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use ZonePartitionIdMessage and remove cast
Map<TablePartitionId, String> enlistedGroups = asTablePartitionIdStringMap(request.groups());
Map<ReplicationGroupId, String> enlistedGroups = asReplicationGroupIdToStringMap(request.groups());

UUID txId = request.txId();

if (request.commit()) {
HybridTimestamp commitTimestamp = request.commitTimestamp();

return schemaCompatValidator.validateCommit(txId, enlistedGroups.keySet(), commitTimestamp)
return schemaCompatValidator.validateCommit(txId, request.tableIds(), commitTimestamp)
.thenCompose(validationResult ->
finishAndCleanup(
enlistedGroups,
Expand All @@ -146,18 +145,18 @@ public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest reques
}
}

private static Map<TablePartitionId, String> asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) {
var result = new HashMap<TablePartitionId, String>(IgniteUtils.capacity(messages.size()));
private static Map<ReplicationGroupId, String> asReplicationGroupIdToStringMap(Map<ReplicationGroupIdMessage, String> messages) {
var result = new HashMap<ReplicationGroupId, String>(IgniteUtils.capacity(messages.size()));

for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) {
result.put(e.getKey().asTablePartitionId(), e.getValue());
for (Entry<ReplicationGroupIdMessage, String> e : messages.entrySet()) {
result.put(e.getKey().asReplicationGroupId(), e.getValue());
}

return result;
}

private CompletableFuture<TransactionResult> finishAndCleanup(
Map<TablePartitionId, String> enlistedPartitions,
Map<ReplicationGroupId, String> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
Expand Down Expand Up @@ -248,7 +247,7 @@ private static void throwIfSchemaValidationOnCommitFailed(CompatValidationResult
* @return Future to wait of the finish.
*/
private CompletableFuture<TransactionResult> finishTransaction(
Collection<TablePartitionId> partitionIds,
Collection<ReplicationGroupId> partitionIds,
UUID txId,
boolean commit,
@Nullable HybridTimestamp commitTimestamp
Expand Down Expand Up @@ -298,7 +297,7 @@ private CompletableFuture<Object> applyFinishCommand(
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
int catalogVersion,
List<TablePartitionIdMessage> partitionIds
List<ReplicationGroupIdMessage> partitionIds
) {
HybridTimestamp now = clockService.now();
FinishTxCommandBuilder finishTxCmdBldr = PARTITION_REPLICATION_MESSAGES_FACTORY.finishTxCommand()
Expand All @@ -315,23 +314,23 @@ private CompletableFuture<Object> applyFinishCommand(
return raftCommandApplicator.applyCmdWithExceptionHandling(finishTxCmdBldr.build());
}

private static List<TablePartitionIdMessage> toPartitionIdMessage(Collection<TablePartitionId> partitionIds) {
List<TablePartitionIdMessage> list = new ArrayList<>(partitionIds.size());
private static List<ReplicationGroupIdMessage> toPartitionIdMessage(Collection<ReplicationGroupId> partitionIds) {
List<ReplicationGroupIdMessage> list = new ArrayList<>(partitionIds.size());

for (TablePartitionId partitionId : partitionIds) {
list.add(tablePartitionId(partitionId));
for (ReplicationGroupId partitionId : partitionIds) {
list.add(replicationGroupId(partitionId));
}

return list;
}

/**
* Method to convert from {@link TablePartitionId} object to command-based {@link TablePartitionIdMessage} object.
* Method to convert from {@link ReplicationGroupId} object to command-based {@link ReplicationGroupIdMessage} object.
*
* @param tablePartId {@link TablePartitionId} object to convert to {@link TablePartitionIdMessage}.
* @return {@link TablePartitionIdMessage} object converted from argument.
* @param replicationGroupId {@link ReplicationGroupId} object to convert to {@link ReplicationGroupIdMessage}.
* @return {@link ReplicationGroupIdMessage} object converted from argument.
*/
private static TablePartitionIdMessage tablePartitionId(TablePartitionId tablePartId) {
return toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartId);
private static ReplicationGroupIdMessage replicationGroupId(ReplicationGroupId replicationGroupId) {
return toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, replicationGroupId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.ignite.internal.partition.replicator;

import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static java.util.concurrent.CompletableFuture.completedFuture;

import java.util.Map;
import java.util.UUID;
Expand All @@ -36,6 +36,7 @@
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.replicator.message.TableAware;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.tx.TxManager;
Expand Down Expand Up @@ -93,10 +94,14 @@ public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID send
return txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request)
.thenApply(res -> new ReplicaResult(res, null));
} else {
LOG.debug("Non table request is not supported by the zone partition yet " + request);
if (request instanceof ReplicaSafeTimeSyncRequest) {
LOG.debug("Non table request is not supported by the zone partition yet " + request);
} else {
LOG.warn("Non table request is not supported by the zone partition yet " + request);
}
}

return nullCompletedFuture();
return completedFuture(new ReplicaResult(null, null));
} else {
int partitionId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.annotations.Transferable;
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
import org.jetbrains.annotations.Nullable;

/**
Expand All @@ -38,5 +38,5 @@ public interface FinishTxCommand extends PartitionCommand {
@Nullable HybridTimestamp commitTimestamp();

/** Returns ordered replication groups IDs. */
List<TablePartitionIdMessage> partitionIds();
List<ReplicationGroupIdMessage> partitionIds();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite.internal.tx.TransactionResult;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
Expand Down Expand Up @@ -111,11 +112,11 @@ public IgniteBiTuple<Serializable, Boolean> handle(FinishTxCommand cmd, long com
return new IgniteBiTuple<>(new TransactionResult(stateToSet, cmd.commitTimestamp()), true);
}

private static List<TablePartitionId> fromPartitionIdMessage(List<TablePartitionIdMessage> partitionIds) {
List<TablePartitionId> list = new ArrayList<>(partitionIds.size());
private static List<ReplicationGroupId> fromPartitionIdMessage(List<ReplicationGroupIdMessage> partitionIds) {
List<ReplicationGroupId> list = new ArrayList<>(partitionIds.size());

for (TablePartitionIdMessage partitionIdMessage : partitionIds) {
list.add(partitionIdMessage.asTablePartitionId());
for (ReplicationGroupIdMessage partitionIdMessage : partitionIds) {
list.add(partitionIdMessage.asReplicationGroupId());
}

return list;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.ignite.internal.partition.replicator.schemacompat;

import static java.util.stream.Collectors.toSet;

import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand All @@ -34,7 +31,6 @@
import org.apache.ignite.internal.partition.replicator.schema.FullTableSchema;
import org.apache.ignite.internal.partition.replicator.schema.TableDefinitionDiff;
import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.tx.TransactionIds;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -74,28 +70,24 @@ public SchemaCompatibilityValidator(
* (identified by the commit timestamp).
*
* @param txId ID of the transaction that gets validated.
* @param enlistedGroupIds IDs of the partitions that are enlisted with the transaction.
* @param enlistedTableIds IDs of the tables that are enlisted with the transaction.
* @param commitTimestamp Commit timestamp.
* @return Future of validation result.
*/
public CompletableFuture<CompatValidationResult> validateCommit(
UUID txId,
Collection<TablePartitionId> enlistedGroupIds,
Set<Integer> enlistedTableIds,
HybridTimestamp commitTimestamp
) {
HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);

Set<Integer> tableIds = enlistedGroupIds.stream()
.map(TablePartitionId::tableId)
.collect(toSet());

// Using compareTo() instead of after()/begin() because the latter methods take clock skew into account
// which only makes sense when comparing 'unrelated' timestamps. beginTs and commitTs have a causal relationship,
// so we don't need to account for clock skew.
assert commitTimestamp.compareTo(beginTimestamp) > 0;

return schemaSyncService.waitForMetadataCompleteness(commitTimestamp)
.thenApply(ignored -> validateCommit(tableIds, commitTimestamp, beginTimestamp));
.thenApply(ignored -> validateCommit(enlistedTableIds, commitTimestamp, beginTimestamp));
}

private CompatValidationResult validateCommit(Set<Integer> tableIds, HybridTimestamp commitTimestamp, HybridTimestamp beginTimestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ private void assertForwardCompatibleChangeAllowsCommitting(SchemaChangeSource ch
when(schemasSource.tableSchemaVersionsBetween(TABLE_ID, beginTimestamp, commitTimestamp))
.thenReturn(changeSource.schemaVersions());

CompletableFuture<CompatValidationResult> resultFuture = validator.validateCommit(txId, List.of(tablePartitionId), commitTimestamp);
CompletableFuture<CompatValidationResult> resultFuture = validator.validateCommit(
txId,
Set.of(tablePartitionId.tableId()),
commitTimestamp
);

assertThat(resultFuture, willCompleteSuccessfully());

Expand All @@ -140,7 +144,11 @@ private void assertForwardIncompatibleChangeDisallowsCommitting(SchemaChangeSour
when(schemasSource.tableSchemaVersionsBetween(TABLE_ID, beginTimestamp, commitTimestamp))
.thenReturn(changeSource.schemaVersions());

CompletableFuture<CompatValidationResult> resultFuture = validator.validateCommit(txId, List.of(tablePartitionId), commitTimestamp);
CompletableFuture<CompatValidationResult> resultFuture = validator.validateCommit(
txId,
Set.of(tablePartitionId.tableId()),
commitTimestamp
);

assertThat(resultFuture, willCompleteSuccessfully());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ private void handleReplicaRequest(ReplicaRequest request, ClusterNode sender, @N

CompletableFuture<ReplicaResult> resFut = replica.processRequest(request, sender.id());

resFut.whenComplete((res, ex) -> {
resFut.handle((res, ex) -> {
NetworkMessage msg;

if (ex == null) {
Expand Down Expand Up @@ -517,6 +517,12 @@ private void handleReplicaRequest(ReplicaRequest request, ClusterNode sender, @N
clusterNetSvc.messagingService().send(senderConsistentId, ChannelType.DEFAULT, msg0);
});
}

return null;
}).whenComplete((res, ex) -> {
if (ex != null) {
failureManager.process(new FailureContext(CRITICAL_ERROR, ex));
}
});
} finally {
leaveBusy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.replicator.message;

import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;

Expand Down Expand Up @@ -55,4 +56,24 @@ public static ZonePartitionIdMessage toZonePartitionIdMessage(
.partitionId(zonePartitionId.partitionId())
.build();
}

/**
* Converts to a network message.
*
* @param messagesFactory Messages factory.
* @param replicationGroupId Replication group ID for a given partition.
* @return New instance of network message.
*/
public static ReplicationGroupIdMessage toReplicationGroupIdMessage(
ReplicaMessagesFactory messagesFactory,
ReplicationGroupId replicationGroupId
) {
assert replicationGroupId instanceof TablePartitionId || replicationGroupId instanceof ZonePartitionId : replicationGroupId;

if (replicationGroupId instanceof TablePartitionId) {
return toTablePartitionIdMessage(messagesFactory, (TablePartitionId) replicationGroupId);
} else {
return toZonePartitionIdMessage(messagesFactory, (ZonePartitionId) replicationGroupId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,7 @@ private InternalTransaction startTxWithEnlistedPartition(int partId, boolean rea

tx.enlist(
tblPartId,
tblPartId.tableId(),
new IgniteBiTuple<>(
ignite.clusterNodes().stream().filter(n -> n.name().equals(primaryReplica.getLeaseholder()))
.findFirst().orElseThrow(),
Expand Down
Loading

0 comments on commit 3c2cb8d

Please sign in to comment.