diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java index f8604ed704b..d9740609849 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -85,7 +86,7 @@ public UUID id() { } @Override - public IgniteBiTuple enlistedNodeAndConsistencyToken(TablePartitionId tablePartitionId) { + public IgniteBiTuple enlistedNodeAndConsistencyToken(ReplicationGroupId replicationGroupId) { return null; } @@ -106,7 +107,8 @@ public TablePartitionId commitPartition() { @Override public IgniteBiTuple enlist( - TablePartitionId tablePartitionId, + ReplicationGroupId replicationGroupId, + int tableId, IgniteBiTuple nodeAndConsistencyToken) { return null; } @@ -203,7 +205,8 @@ public CompletableFuture finish( HybridTimestampTracker timestampTracker, TablePartitionId commitPartition, boolean commit, - Map> enlistedGroups, + Map> enlistedGroups, + Set enlistedTableIds, UUID txId ) { return nullCompletedFuture(); @@ -212,7 +215,7 @@ public CompletableFuture finish( @Override public CompletableFuture cleanup( ReplicationGroupId commitPartitionId, - Map enlistedPartitions, + Map enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId @@ -223,7 +226,7 @@ public CompletableFuture cleanup( @Override public CompletableFuture cleanup( TablePartitionId commitPartitionId, - Collection enlistedPartitions, + Collection enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java index 1106f4d5a9c..04ee6420c82 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java @@ -475,7 +475,7 @@ void testLocalRaftLogReapplication() throws Exception { } @ParameterizedTest - @ValueSource(booleans = {true, false}) + @ValueSource(booleans = {false, true}) void txFinishCommandGetsReplicated(boolean commit) throws Exception { startCluster(3); diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java index a425b562ce4..dcf5bf37c7e 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java @@ -19,7 +19,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; -import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage; +import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage; import static org.apache.ignite.internal.tx.TxState.ABORTED; import static org.apache.ignite.internal.tx.TxState.COMMITTED; import static org.apache.ignite.internal.tx.TxState.isFinalState; @@ -47,9 +47,8 @@ import org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator; import org.apache.ignite.internal.raft.service.RaftCommandRunner; import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; -import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; import org.apache.ignite.internal.schema.SchemaSyncService; import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException; import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException; @@ -122,14 +121,14 @@ public TxFinishReplicaRequestHandler( */ public CompletableFuture handle(TxFinishReplicaRequest request) { // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use ZonePartitionIdMessage and remove cast - Map enlistedGroups = asTablePartitionIdStringMap(request.groups()); + Map enlistedGroups = asReplicationGroupIdToStringMap(request.groups()); UUID txId = request.txId(); if (request.commit()) { HybridTimestamp commitTimestamp = request.commitTimestamp(); - return schemaCompatValidator.validateCommit(txId, enlistedGroups.keySet(), commitTimestamp) + return schemaCompatValidator.validateCommit(txId, request.tableIds(), commitTimestamp) .thenCompose(validationResult -> finishAndCleanup( enlistedGroups, @@ -146,18 +145,18 @@ public CompletableFuture handle(TxFinishReplicaRequest reques } } - private static Map asTablePartitionIdStringMap(Map messages) { - var result = new HashMap(IgniteUtils.capacity(messages.size())); + private static Map asReplicationGroupIdToStringMap(Map messages) { + var result = new HashMap(IgniteUtils.capacity(messages.size())); - for (Entry e : messages.entrySet()) { - result.put(e.getKey().asTablePartitionId(), e.getValue()); + for (Entry e : messages.entrySet()) { + result.put(e.getKey().asReplicationGroupId(), e.getValue()); } return result; } private CompletableFuture finishAndCleanup( - Map enlistedPartitions, + Map enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId @@ -248,7 +247,7 @@ private static void throwIfSchemaValidationOnCommitFailed(CompatValidationResult * @return Future to wait of the finish. */ private CompletableFuture finishTransaction( - Collection partitionIds, + Collection partitionIds, UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp @@ -298,7 +297,7 @@ private CompletableFuture applyFinishCommand( boolean commit, @Nullable HybridTimestamp commitTimestamp, int catalogVersion, - List partitionIds + List partitionIds ) { HybridTimestamp now = clockService.now(); FinishTxCommandBuilder finishTxCmdBldr = PARTITION_REPLICATION_MESSAGES_FACTORY.finishTxCommand() @@ -315,23 +314,23 @@ private CompletableFuture applyFinishCommand( return raftCommandApplicator.applyCmdWithExceptionHandling(finishTxCmdBldr.build()); } - private static List toPartitionIdMessage(Collection partitionIds) { - List list = new ArrayList<>(partitionIds.size()); + private static List toPartitionIdMessage(Collection partitionIds) { + List list = new ArrayList<>(partitionIds.size()); - for (TablePartitionId partitionId : partitionIds) { - list.add(tablePartitionId(partitionId)); + for (ReplicationGroupId partitionId : partitionIds) { + list.add(replicationGroupId(partitionId)); } return list; } /** - * Method to convert from {@link TablePartitionId} object to command-based {@link TablePartitionIdMessage} object. + * Method to convert from {@link ReplicationGroupId} object to command-based {@link ReplicationGroupIdMessage} object. * - * @param tablePartId {@link TablePartitionId} object to convert to {@link TablePartitionIdMessage}. - * @return {@link TablePartitionIdMessage} object converted from argument. + * @param replicationGroupId {@link ReplicationGroupId} object to convert to {@link ReplicationGroupIdMessage}. + * @return {@link ReplicationGroupIdMessage} object converted from argument. */ - private static TablePartitionIdMessage tablePartitionId(TablePartitionId tablePartId) { - return toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartId); + private static ReplicationGroupIdMessage replicationGroupId(ReplicationGroupId replicationGroupId) { + return toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, replicationGroupId); } } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java index 9b08acb0e70..fdec32b6444 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.partition.replicator; -import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static java.util.concurrent.CompletableFuture.completedFuture; import java.util.Map; import java.util.UUID; @@ -36,6 +36,7 @@ import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.replicator.listener.ReplicaListener; import org.apache.ignite.internal.replicator.message.ReplicaRequest; +import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest; import org.apache.ignite.internal.replicator.message.TableAware; import org.apache.ignite.internal.schema.SchemaSyncService; import org.apache.ignite.internal.tx.TxManager; @@ -93,10 +94,14 @@ public CompletableFuture invoke(ReplicaRequest request, UUID send return txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request) .thenApply(res -> new ReplicaResult(res, null)); } else { - LOG.debug("Non table request is not supported by the zone partition yet " + request); + if (request instanceof ReplicaSafeTimeSyncRequest) { + LOG.debug("Non table request is not supported by the zone partition yet " + request); + } else { + LOG.warn("Non table request is not supported by the zone partition yet " + request); + } } - return nullCompletedFuture(); + return completedFuture(new ReplicaResult(null, null)); } else { int partitionId; diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/FinishTxCommand.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/FinishTxCommand.java index 2e2d858c18f..0050125a8e3 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/FinishTxCommand.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/FinishTxCommand.java @@ -21,7 +21,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.annotations.Transferable; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; -import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; import org.jetbrains.annotations.Nullable; /** @@ -38,5 +38,5 @@ public interface FinishTxCommand extends PartitionCommand { @Nullable HybridTimestamp commitTimestamp(); /** Returns ordered replication groups IDs. */ - List partitionIds(); + List partitionIds(); } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/FinishTxCommandHandler.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/FinishTxCommandHandler.java index 0dbd4744318..3c48fd36bd7 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/FinishTxCommandHandler.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/FinishTxCommandHandler.java @@ -30,8 +30,9 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; -import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; import org.apache.ignite.internal.tx.TransactionResult; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.TxMeta; @@ -111,11 +112,11 @@ public IgniteBiTuple handle(FinishTxCommand cmd, long com return new IgniteBiTuple<>(new TransactionResult(stateToSet, cmd.commitTimestamp()), true); } - private static List fromPartitionIdMessage(List partitionIds) { - List list = new ArrayList<>(partitionIds.size()); + private static List fromPartitionIdMessage(List partitionIds) { + List list = new ArrayList<>(partitionIds.size()); - for (TablePartitionIdMessage partitionIdMessage : partitionIds) { - list.add(partitionIdMessage.asTablePartitionId()); + for (ReplicationGroupIdMessage partitionIdMessage : partitionIds) { + list.add(partitionIdMessage.asReplicationGroupId()); } return list; diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidator.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidator.java index 989b6d1a982..65c41c506dd 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidator.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidator.java @@ -17,9 +17,6 @@ package org.apache.ignite.internal.partition.replicator.schemacompat; -import static java.util.stream.Collectors.toSet; - -import java.util.Collection; import java.util.List; import java.util.Set; import java.util.UUID; @@ -34,7 +31,6 @@ import org.apache.ignite.internal.partition.replicator.schema.FullTableSchema; import org.apache.ignite.internal.partition.replicator.schema.TableDefinitionDiff; import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource; -import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.SchemaSyncService; import org.apache.ignite.internal.tx.TransactionIds; import org.jetbrains.annotations.Nullable; @@ -74,28 +70,24 @@ public SchemaCompatibilityValidator( * (identified by the commit timestamp). * * @param txId ID of the transaction that gets validated. - * @param enlistedGroupIds IDs of the partitions that are enlisted with the transaction. + * @param enlistedTableIds IDs of the tables that are enlisted with the transaction. * @param commitTimestamp Commit timestamp. * @return Future of validation result. */ public CompletableFuture validateCommit( UUID txId, - Collection enlistedGroupIds, + Set enlistedTableIds, HybridTimestamp commitTimestamp ) { HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId); - Set tableIds = enlistedGroupIds.stream() - .map(TablePartitionId::tableId) - .collect(toSet()); - // Using compareTo() instead of after()/begin() because the latter methods take clock skew into account // which only makes sense when comparing 'unrelated' timestamps. beginTs and commitTs have a causal relationship, // so we don't need to account for clock skew. assert commitTimestamp.compareTo(beginTimestamp) > 0; return schemaSyncService.waitForMetadataCompleteness(commitTimestamp) - .thenApply(ignored -> validateCommit(tableIds, commitTimestamp, beginTimestamp)); + .thenApply(ignored -> validateCommit(enlistedTableIds, commitTimestamp, beginTimestamp)); } private CompatValidationResult validateCommit(Set tableIds, HybridTimestamp commitTimestamp, HybridTimestamp beginTimestamp) { diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidatorTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidatorTest.java index 22a5ccf43f9..3048a74be0c 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidatorTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidatorTest.java @@ -120,7 +120,11 @@ private void assertForwardCompatibleChangeAllowsCommitting(SchemaChangeSource ch when(schemasSource.tableSchemaVersionsBetween(TABLE_ID, beginTimestamp, commitTimestamp)) .thenReturn(changeSource.schemaVersions()); - CompletableFuture resultFuture = validator.validateCommit(txId, List.of(tablePartitionId), commitTimestamp); + CompletableFuture resultFuture = validator.validateCommit( + txId, + Set.of(tablePartitionId.tableId()), + commitTimestamp + ); assertThat(resultFuture, willCompleteSuccessfully()); @@ -140,7 +144,11 @@ private void assertForwardIncompatibleChangeDisallowsCommitting(SchemaChangeSour when(schemasSource.tableSchemaVersionsBetween(TABLE_ID, beginTimestamp, commitTimestamp)) .thenReturn(changeSource.schemaVersions()); - CompletableFuture resultFuture = validator.validateCommit(txId, List.of(tablePartitionId), commitTimestamp); + CompletableFuture resultFuture = validator.validateCommit( + txId, + Set.of(tablePartitionId.tableId()), + commitTimestamp + ); assertThat(resultFuture, willCompleteSuccessfully()); diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index 8f637e2bda8..151683efb98 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -476,7 +476,7 @@ private void handleReplicaRequest(ReplicaRequest request, ClusterNode sender, @N CompletableFuture resFut = replica.processRequest(request, sender.id()); - resFut.whenComplete((res, ex) -> { + resFut.handle((res, ex) -> { NetworkMessage msg; if (ex == null) { @@ -517,6 +517,12 @@ private void handleReplicaRequest(ReplicaRequest request, ClusterNode sender, @N clusterNetSvc.messagingService().send(senderConsistentId, ChannelType.DEFAULT, msg0); }); } + + return null; + }).whenComplete((res, ex) -> { + if (ex != null) { + failureManager.process(new FailureContext(CRITICAL_ERROR, ex)); + } }); } finally { leaveBusy(); diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageUtils.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageUtils.java index 665f1317335..fe622eee09a 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageUtils.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageUtils.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.replicator.message; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.ZonePartitionId; @@ -55,4 +56,24 @@ public static ZonePartitionIdMessage toZonePartitionIdMessage( .partitionId(zonePartitionId.partitionId()) .build(); } + + /** + * Converts to a network message. + * + * @param messagesFactory Messages factory. + * @param replicationGroupId Replication group ID for a given partition. + * @return New instance of network message. + */ + public static ReplicationGroupIdMessage toReplicationGroupIdMessage( + ReplicaMessagesFactory messagesFactory, + ReplicationGroupId replicationGroupId + ) { + assert replicationGroupId instanceof TablePartitionId || replicationGroupId instanceof ZonePartitionId : replicationGroupId; + + if (replicationGroupId instanceof TablePartitionId) { + return toTablePartitionIdMessage(messagesFactory, (TablePartitionId) replicationGroupId); + } else { + return toZonePartitionIdMessage(messagesFactory, (ZonePartitionId) replicationGroupId); + } + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java index f536765bf09..2287e5b18e5 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java @@ -1044,6 +1044,7 @@ private InternalTransaction startTxWithEnlistedPartition(int partId, boolean rea tx.enlist( tblPartId, + tblPartId.tableId(), new IgniteBiTuple<>( ignite.clusterNodes().stream().filter(n -> n.name().equals(primaryReplica.getLeaseholder())) .findFirst().orElseThrow(), diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index 570ecd1ea3e..14bccb6fe36 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -1148,11 +1148,13 @@ private void enlist(int tableId, Int2ObjectMap assignm tx.assignCommitPartition(new TablePartitionId(tableId, ThreadLocalRandom.current().nextInt(partsCnt))); for (Map.Entry partWithToken : assignments.int2ObjectEntrySet()) { + // TODO: IGNITE-24482 - enlist either table or zone partition ID. TablePartitionId tablePartId = new TablePartitionId(tableId, partWithToken.getKey()); NodeWithConsistencyToken assignment = partWithToken.getValue(); tx.enlist(tablePartId, + tableId, new IgniteBiTuple<>( topSrvc.getByConsistentId(assignment.name()), assignment.enlistmentConsistencyToken()) diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java index 2e0059c8e73..64915cd0eba 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java @@ -19,6 +19,7 @@ import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.times; import java.util.List; @@ -99,7 +100,7 @@ void testEnlistCall() { // No op. } - Mockito.verify(spiedTx, times(2)).enlist(any(), any()); + Mockito.verify(spiedTx, times(2)).enlist(any(), anyInt(), any()); } private static QueryChecker assertQuery(String qry, InternalTransaction tx) { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java index 68b11f78595..e4d0b6e292a 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.network.ClusterNodeImpl; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.TxState; @@ -143,7 +144,7 @@ public UUID coordinatorId() { } @Override - public IgniteBiTuple enlistedNodeAndConsistencyToken(TablePartitionId tablePartitionId) { + public IgniteBiTuple enlistedNodeAndConsistencyToken(ReplicationGroupId tablePartitionId) { return tuple; } @@ -187,8 +188,11 @@ public long timeout() { } @Override - public IgniteBiTuple enlist(TablePartitionId tablePartitionId, - IgniteBiTuple nodeAndConsistencyToken) { + public IgniteBiTuple enlist( + ReplicationGroupId replicationGroupId, + int tableId, + IgniteBiTuple nodeAndConsistencyToken + ) { return nodeAndConsistencyToken; } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java index 95435955d03..70f87335908 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java @@ -82,7 +82,7 @@ protected InternalTransaction startTx() { ClusterNode primaryReplicaNode = getPrimaryReplica(tblPartId); - tx.enlist(tblPartId, new IgniteBiTuple<>(primaryReplicaNode, term)); + tx.enlist(tblPartId, internalTbl.tableId(), new IgniteBiTuple<>(primaryReplicaNode, term)); return tx; } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index 96340dd5300..946dc37599f 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -220,7 +220,8 @@ public CompletableFuture finish( HybridTimestampTracker observableTimestampTracker, TablePartitionId commitPartition, boolean commitIntent, - Map> enlistedGroups, + Map> enlistedGroups, + Set enlistedTableIds, UUID txId ) { return nullCompletedFuture(); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 0eae000633f..a3c0559adf8 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -717,6 +717,7 @@ private CompletableFuture triggerTxRecovery(UUID txId, UUID senderId) { false, // Enlistment consistency token is not required for the rollback, so it is 0L. Map.of(replicationGroupId, new IgniteBiTuple<>(clusterNodeResolver.getById(senderId), 0L)), + Set.of(replicationGroupId.tableId()), txId ) .whenComplete((v, ex) -> runCleanupOnNode(replicationGroupId, txId, senderId)); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index ad8a284ba11..f3bde11dc3e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -43,6 +43,7 @@ import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_REPLACE_IF_EXIST; import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_UPSERT; import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_UPSERT_ALL; +import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage; import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage; import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage; import static org.apache.ignite.internal.table.distributed.TableUtils.isDirectFlowApplicableTx; @@ -314,7 +315,7 @@ public synchronized void name(String newName) { private CompletableFuture enlistInTx( BinaryRowEx row, @Nullable InternalTransaction tx, - IgniteTriFunction fac, + IgniteTriFunction fac, BiPredicate noWriteChecker ) { return enlistInTx(row, tx, fac, noWriteChecker, null); @@ -333,7 +334,7 @@ private CompletableFuture enlistInTx( private CompletableFuture enlistInTx( BinaryRowEx row, @Nullable InternalTransaction tx, - IgniteTriFunction fac, + IgniteTriFunction fac, BiPredicate noWriteChecker, @Nullable Long txStartTs ) { @@ -353,7 +354,7 @@ private CompletableFuture enlistInTx( int partId = partitionId(row); - TablePartitionId partGroupId = new TablePartitionId(tableId, partId); + ReplicationGroupId partGroupId = targetReplicationGroupId(partId); IgniteBiTuple primaryReplicaAndConsistencyToken = actualTx.enlistedNodeAndConsistencyToken(partGroupId); @@ -923,7 +924,7 @@ public CompletableFuture get(BinaryRowEx keyRow, @Nullable InternalTr keyRow, tx, (txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest() - .groupId(serializeTablePartitionId(groupId)) + .groupId(serializeReplicationGroupId(groupId)) .tableId(tableId) .schemaVersion(keyRow.schemaVersion()) .primaryKey(keyRow.tupleSlice()) @@ -1141,7 +1142,7 @@ public CompletableFuture upsert(BinaryRowEx row, @Nullable InternalTransac row, tx, (txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest() - .groupId(serializeTablePartitionId(groupId)) + .groupId(serializeReplicationGroupId(groupId)) .tableId(tableId) .commitPartitionId(serializeTablePartitionId(txo.commitPartition())) .schemaVersion(row.schemaVersion()) @@ -1231,7 +1232,7 @@ public CompletableFuture getAndUpsert(BinaryRowEx row, InternalTransa row, tx, (txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest() - .groupId(serializeTablePartitionId(groupId)) + .groupId(serializeReplicationGroupId(groupId)) .tableId(tableId) .commitPartitionId(serializeTablePartitionId(txo.commitPartition())) .schemaVersion(row.schemaVersion()) @@ -1254,7 +1255,7 @@ public CompletableFuture insert(BinaryRowEx row, InternalTransaction tx row, tx, (txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest() - .groupId(serializeTablePartitionId(groupId)) + .groupId(serializeReplicationGroupId(groupId)) .tableId(tableId) .commitPartitionId(serializeTablePartitionId(txo.commitPartition())) .schemaVersion(row.schemaVersion()) @@ -1333,7 +1334,7 @@ public CompletableFuture replace(BinaryRowEx row, InternalTransaction t row, tx, (txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest() - .groupId(serializeTablePartitionId(groupId)) + .groupId(serializeReplicationGroupId(groupId)) .tableId(tableId) .commitPartitionId(serializeTablePartitionId(txo.commitPartition())) .schemaVersion(row.schemaVersion()) @@ -1359,7 +1360,7 @@ public CompletableFuture replace(BinaryRowEx oldRow, BinaryRowEx newRow newRow, tx, (txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest() - .groupId(serializeTablePartitionId(groupId)) + .groupId(serializeReplicationGroupId(groupId)) .tableId(tableId) .commitPartitionId(serializeTablePartitionId(txo.commitPartition())) .schemaVersion(oldRow.schemaVersion()) @@ -1385,7 +1386,7 @@ public CompletableFuture getAndReplace(BinaryRowEx row, InternalTrans row, tx, (txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest() - .groupId(serializeTablePartitionId(groupId)) + .groupId(serializeReplicationGroupId(groupId)) .tableId(tableId) .commitPartitionId(serializeTablePartitionId(txo.commitPartition())) .schemaVersion(row.schemaVersion()) @@ -1408,7 +1409,7 @@ public CompletableFuture delete(BinaryRowEx keyRow, InternalTransaction keyRow, tx, (txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest() - .groupId(serializeTablePartitionId(groupId)) + .groupId(serializeReplicationGroupId(groupId)) .tableId(tableId) .commitPartitionId(serializeTablePartitionId(txo.commitPartition())) .schemaVersion(keyRow.schemaVersion()) @@ -1431,7 +1432,7 @@ public CompletableFuture deleteExact(BinaryRowEx oldRow, InternalTransa oldRow, tx, (txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest() - .groupId(serializeTablePartitionId(groupId)) + .groupId(serializeReplicationGroupId(groupId)) .tableId(tableId) .commitPartitionId(serializeTablePartitionId(txo.commitPartition())) .schemaVersion(oldRow.schemaVersion()) @@ -1456,7 +1457,7 @@ public CompletableFuture getAndDelete(BinaryRowEx row, InternalTransa row, tx, (txo, groupId, enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest() - .groupId(serializeTablePartitionId(groupId)) + .groupId(serializeReplicationGroupId(groupId)) .tableId(tableId) .commitPartitionId(serializeTablePartitionId(txo.commitPartition())) .schemaVersion(row.schemaVersion()) @@ -1831,7 +1832,7 @@ private CompletableFuture completeScan( : replicaGrpId; ScanCloseReplicaRequest scanCloseReplicaRequest = TABLE_MESSAGES_FACTORY.scanCloseReplicaRequest() - .groupId(colocationAwareSerializeReplicationGroupId(colocationAwareReplicationGroupId)) + .groupId(serializeReplicationGroupId(colocationAwareReplicationGroupId)) .tableId(replicaGrpId.tableId()) .transactionId(txId) .scanId(scanId) @@ -2007,12 +2008,12 @@ protected CompletableFuture> enlist(int partId, ReplicaMeta meta = placementDriver.getCurrentPrimaryReplica(tablePartitionId, now); Function> enlistClo = replicaMeta -> { - TablePartitionId partGroupId = new TablePartitionId(tableId, partId); + ReplicationGroupId partGroupId = targetReplicationGroupId(partId); IgniteBiTuple enlistState = new IgniteBiTuple<>(getClusterNode(replicaMeta), enlistmentConsistencyToken(replicaMeta)); - tx.enlist(partGroupId, enlistState); + tx.enlist(partGroupId, tableId, enlistState); return enlistState; }; @@ -2170,8 +2171,8 @@ public CompletableFuture estimatedSize() { var invokeFutures = new CompletableFuture[partitions]; for (int partId = 0; partId < partitions; partId++) { - ReplicationGroupId replicaGroupId = colocationAwareReplicationGroupId(partId); - ReplicationGroupIdMessage partitionIdMessage = colocationAwareSerializeReplicationGroupId(replicaGroupId); + ReplicationGroupId replicaGroupId = targetReplicationGroupId(partId); + ReplicationGroupIdMessage partitionIdMessage = serializeReplicationGroupId(replicaGroupId); Function requestFactory = replicaMeta -> TABLE_MESSAGES_FACTORY.getEstimatedSizeRequest() @@ -2187,6 +2188,18 @@ public CompletableFuture estimatedSize() { .thenApply(v -> Arrays.stream(invokeFutures).mapToLong(f -> (Long) f.join()).sum()); } + private ReplicationGroupId targetReplicationGroupId(int partId) { + if (enabledColocationFeature) { + return new ZonePartitionId(zoneId, partId); + } else { + return new TablePartitionId(tableId, partId); + } + } + + private static ReplicationGroupIdMessage serializeReplicationGroupId(ReplicationGroupId replicationGroupId) { + return toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, replicationGroupId); + } + @Override public StreamerReceiverRunner streamerReceiverRunner() { return streamerReceiverRunner; @@ -2335,16 +2348,4 @@ private void checkTransactionFinishStarted(@Nullable InternalTransaction transac )); } } - - private ReplicationGroupId colocationAwareReplicationGroupId(int partId) { - return enabledColocationFeature - ? new ZonePartitionId(zoneId, partId) - : new TablePartitionId(tableId, partId); - } - - private ReplicationGroupIdMessage colocationAwareSerializeReplicationGroupId(ReplicationGroupId replicaGroupId) { - return enabledColocationFeature - ? serializeZonePartitionId((ZonePartitionId) replicaGroupId) - : serializeTablePartitionId((TablePartitionId) replicaGroupId); - } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java index d46617ea130..accf42f3961 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.SchemaDescriptor; @@ -112,7 +113,7 @@ public void testUpdateCommand() throws Exception { } @Test - public void testRemoveCommand() throws Exception { + public void testRemoveCommand() { TablePartitionIdMessage tablePartitionIdMessage = REPLICA_MESSAGES_FACTORY.tablePartitionIdMessage() .tableId(1) .partitionId(1) @@ -182,7 +183,7 @@ public void testUpdateAllCommand() throws Exception { } @Test - public void testRemoveAllCommand() throws Exception { + public void testRemoveAllCommand() { Map rowsToRemove = new HashMap<>(); for (int i = 0; i < 10; i++) { @@ -215,7 +216,7 @@ public void testRemoveAllCommand() throws Exception { } @Test - public void testTxCleanupCommand() throws Exception { + public void testTxCleanupCommand() { HybridClock clock = new HybridClockImpl(); WriteIntentSwitchCommand cmd = PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommand() @@ -233,9 +234,9 @@ public void testTxCleanupCommand() throws Exception { } @Test - public void testFinishTxCommand() throws Exception { + public void testFinishTxCommand() { HybridClock clock = new HybridClockImpl(); - ArrayList grps = new ArrayList<>(10); + ArrayList grps = new ArrayList<>(10); for (int i = 0; i < 10; i++) { grps.add(REPLICA_MESSAGES_FACTORY.tablePartitionIdMessage() @@ -311,7 +312,7 @@ private T copyCommand(T cmd) { } } - private BinaryRowMessage binaryRowMessage(int id) throws Exception { + private BinaryRowMessage binaryRowMessage(int id) { Row row = kvMarshaller.marshal( new TestKey(id, String.valueOf(id)), new TestValue(id, String.valueOf(id)) diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 2f510fce89f..ee69fb3e9bc 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -22,6 +22,7 @@ import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_BUILDING; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; @@ -738,7 +739,7 @@ private void reset() { @Test public void testTxStateReplicaRequestEmptyState() throws Exception { doAnswer(invocation -> { - UUID txId = invocation.getArgument(4); + UUID txId = invocation.getArgument(5); txManager.updateTxMeta(txId, old -> new TxStateMeta( ABORTED, @@ -749,7 +750,7 @@ public void testTxStateReplicaRequestEmptyState() throws Exception { )); return nullCompletedFuture(); - }).when(txManager).finish(any(), any(), anyBoolean(), any(), any()); + }).when(txManager).finish(any(), any(), anyBoolean(), any(), any(), any()); CompletableFuture fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest() .groupId(tablePartitionIdMessage(grpId)) @@ -1813,6 +1814,7 @@ private CompletableFuture beginAndAbortTx() { .commitPartitionId(tablePartitionIdMessage(grpId)) .txId(txId) .groups(Map.of(tablePartitionIdMessage(grpId), localNode.name())) + .tableIds(Set.of(grpId.tableId())) .commit(false) .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) .build(); @@ -1878,6 +1880,7 @@ private CompletableFuture beginAndCommitTx() { .commitPartitionId(tablePartitionIdMessage(grpId)) .txId(txId) .groups(Map.of(tablePartitionIdMessage(grpId), localNode.name())) + .tableIds(Set.of(grpId.tableId())) .commit(true) .commitTimestamp(commitTimestamp) .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) @@ -2225,7 +2228,7 @@ private static void configureTxManager(TxManager txManager) { doAnswer(invocation -> nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class)); - doAnswer(invocation -> nullCompletedFuture()).when(txManager).finish(any(), any(), anyBoolean(), any(), any()); + doAnswer(invocation -> nullCompletedFuture()).when(txManager).finish(any(), any(), anyBoolean(), any(), any(), any()); doAnswer(invocation -> nullCompletedFuture()).when(txManager).cleanup(any(), anyString(), any()); } @@ -2651,6 +2654,7 @@ private void testCommitRequestIfTableWasDropped( .groupId(tablePartitionIdMessage(commitPartitionId)) .commitPartitionId(tablePartitionIdMessage(commitPartitionId)) .groups(groups.entrySet().stream().collect(toMap(e -> tablePartitionIdMessage(e.getKey()), Map.Entry::getValue))) + .tableIds(groups.keySet().stream().map(TablePartitionId::tableId).collect(toSet())) .txId(txId) .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) .commit(true) diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java index d2ba2a059ad..67e5a3eac3e 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java @@ -47,6 +47,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -739,11 +740,13 @@ public void testFinishAlreadyFinishedTx() throws Exception { IgniteImpl txCrdNode2 = unwrapIgniteImpl(node(0)); + TablePartitionId commitPartition = ((InternalTransaction) rwTx1).commitPartition(); CompletableFuture finish2 = txCrdNode2.txManager().finish( HybridTimestampTracker.atomicTracker(null), - ((InternalTransaction) rwTx1).commitPartition(), + commitPartition, false, - Map.of(((InternalTransaction) rwTx1).commitPartition(), new IgniteBiTuple<>(txCrdNode2.node(), 0L)), + Map.of(commitPartition, new IgniteBiTuple<>(txCrdNode2.node(), 0L)), + Set.of(commitPartition.tableId()), rwTx1Id ); diff --git a/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java b/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java index 3c77e812f8b..ffdcf54d0c2 100644 --- a/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java +++ b/modules/transactions/src/jmh/java/org/apache/ignite/internal/tx/impl/TransactionExpirationRegistryBenchmark.java @@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.TxState; @@ -94,7 +95,7 @@ public UUID id() { } @Override - public IgniteBiTuple enlistedNodeAndConsistencyToken(TablePartitionId tablePartitionId) { + public IgniteBiTuple enlistedNodeAndConsistencyToken(ReplicationGroupId replicationGroupId) { return null; } @@ -114,8 +115,11 @@ public TablePartitionId commitPartition() { } @Override - public IgniteBiTuple enlist(TablePartitionId tablePartitionId, - IgniteBiTuple nodeAndConsistencyToken) { + public IgniteBiTuple enlist( + ReplicationGroupId replicationGroupId, + int tableId, + IgniteBiTuple nodeAndConsistencyToken + ) { return null; } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java index 6a1e561f45b..669df10169d 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java @@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.tx.Transaction; @@ -40,10 +41,10 @@ public interface InternalTransaction extends Transaction { /** * Returns enlisted primary replica node associated with given replication group. * - * @param tablePartitionId Table partition id. + * @param replicationGroupId Replication group ID. * @return Enlisted primary replica node and consistency token associated with given replication group. */ - IgniteBiTuple enlistedNodeAndConsistencyToken(TablePartitionId tablePartitionId); + IgniteBiTuple enlistedNodeAndConsistencyToken(ReplicationGroupId replicationGroupId); /** * Returns a transaction state. @@ -70,11 +71,16 @@ public interface InternalTransaction extends Transaction { /** * Enlists a partition group into a transaction. * - * @param tablePartitionId Table partition id to enlist. + * @param replicationGroupId Replication group id to enlist. + * @param tableId Table ID for enlistment. * @param nodeAndConsistencyToken Primary replica cluster node and consistency token to enlist for given replication group. * @return {@code True} if a partition is enlisted into the transaction. */ - IgniteBiTuple enlist(TablePartitionId tablePartitionId, IgniteBiTuple nodeAndConsistencyToken); + IgniteBiTuple enlist( + ReplicationGroupId replicationGroupId, + int tableId, + IgniteBiTuple nodeAndConsistencyToken + ); /** * Returns read timestamp for the given transaction if it is a read-only one or {code null} otherwise. diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java index d5eecd46e97..182833e7b5c 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -158,13 +159,15 @@ default InternalTransaction beginExplicitRo(HybridTimestampTracker timestampTrac * @param commitPartition Partition to store a transaction state. * @param commit {@code true} if a commit requested. * @param enlistedGroups Enlisted partition groups with consistency tokens. + * @param enlistedTableIds IDs of the tables taking part in the transaction. * @param txId Transaction id. */ CompletableFuture finish( HybridTimestampTracker timestampTracker, TablePartitionId commitPartition, boolean commit, - Map> enlistedGroups, + Map> enlistedGroups, + Set enlistedTableIds, UUID txId ); @@ -182,7 +185,7 @@ CompletableFuture finish( */ CompletableFuture cleanup( ReplicationGroupId commitPartitionId, - Map enlistedPartitions, + Map enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId @@ -202,7 +205,7 @@ CompletableFuture cleanup( */ CompletableFuture cleanup( TablePartitionId commitPartitionId, - Collection enlistedPartitions, + Collection enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java index 3ad3cd1df87..c69f3aecd21 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java @@ -18,15 +18,15 @@ package org.apache.ignite.internal.tx; import static java.util.Collections.unmodifiableCollection; -import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage; +import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage; import java.util.ArrayList; import java.util.Collection; import java.util.Objects; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; -import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.tx.message.TxMessagesFactory; import org.apache.ignite.internal.tx.message.TxMetaMessage; @@ -41,7 +41,7 @@ public class TxMeta implements TransactionMeta { private final TxState txState; /** Collection of enlisted partition groups. */ - private final Collection enlistedPartitions; + private final Collection enlistedPartitions; /** Commit timestamp. */ private final @Nullable HybridTimestamp commitTimestamp; @@ -53,7 +53,7 @@ public class TxMeta implements TransactionMeta { * @param enlistedPartitions Collection of enlisted partition groups. * @param commitTimestamp Commit timestamp. */ - public TxMeta(TxState txState, Collection enlistedPartitions, @Nullable HybridTimestamp commitTimestamp) { + public TxMeta(TxState txState, Collection enlistedPartitions, @Nullable HybridTimestamp commitTimestamp) { this.txState = txState; this.enlistedPartitions = enlistedPartitions; this.commitTimestamp = commitTimestamp; @@ -64,7 +64,7 @@ public TxState txState() { return txState; } - public Collection enlistedPartitions() { + public Collection enlistedPartitions() { return unmodifiableCollection(enlistedPartitions); } @@ -75,10 +75,10 @@ public Collection enlistedPartitions() { @Override public TxMetaMessage toTransactionMetaMessage(ReplicaMessagesFactory replicaMessagesFactory, TxMessagesFactory txMessagesFactory) { - var enlistedPartitionMessages = new ArrayList(enlistedPartitions.size()); + var enlistedPartitionMessages = new ArrayList(enlistedPartitions.size()); - for (TablePartitionId enlistedPartition : enlistedPartitions) { - enlistedPartitionMessages.add(toTablePartitionIdMessage(replicaMessagesFactory, enlistedPartition)); + for (ReplicationGroupId enlistedPartition : enlistedPartitions) { + enlistedPartitionMessages.add(toReplicationGroupIdMessage(replicaMessagesFactory, enlistedPartition)); } return txMessagesFactory.txMetaMessage() diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMetaSerializer.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMetaSerializer.java index 51f5df511a0..178d6ef5de0 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMetaSerializer.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMetaSerializer.java @@ -17,11 +17,17 @@ package org.apache.ignite.internal.tx; +import static org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG; +import static org.apache.ignite.internal.lang.IgniteSystemProperties.getBoolean; + import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.replicator.PartitionGroupId; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.util.io.IgniteDataInput; import org.apache.ignite.internal.util.io.IgniteDataOutput; import org.apache.ignite.internal.versioned.VersionedSerializer; @@ -33,14 +39,19 @@ public class TxMetaSerializer extends VersionedSerializer { /** Serializer instance. */ public static final TxMetaSerializer INSTANCE = new TxMetaSerializer(); + // TODO IGNITE-22115 remove it + private static final boolean ENABLED_COLOCATION_FEATURE = getBoolean(COLOCATION_FEATURE_FLAG, false); + @Override protected void writeExternalData(TxMeta meta, IgniteDataOutput out) throws IOException { out.writeVarInt(meta.txState().ordinal()); out.writeVarInt(meta.enlistedPartitions().size()); - for (TablePartitionId partitionId : meta.enlistedPartitions()) { - out.writeVarInt(partitionId.tableId()); - out.writeVarInt(partitionId.partitionId()); + for (ReplicationGroupId partitionId : meta.enlistedPartitions()) { + PartitionGroupId partitionGroupId = (PartitionGroupId) partitionId; + + out.writeVarInt(partitionGroupId.objectId()); + out.writeVarInt(partitionGroupId.partitionId()); } HybridTimestamp.write(meta.commitTimestamp(), out); @@ -49,23 +60,31 @@ protected void writeExternalData(TxMeta meta, IgniteDataOutput out) throws IOExc @Override protected TxMeta readExternalData(byte protoVer, IgniteDataInput in) throws IOException { TxState state = TxState.fromOrdinal(in.readVarIntAsInt()); - List enlistedPartitions = readEnlistedPartitions(in); + List enlistedPartitions = readEnlistedPartitions(in); HybridTimestamp commitTimestamp = HybridTimestamp.readNullableFrom(in); return new TxMeta(state, enlistedPartitions, commitTimestamp); } - private static List readEnlistedPartitions(IgniteDataInput in) throws IOException { + private static List readEnlistedPartitions(IgniteDataInput in) throws IOException { int length = in.readVarIntAsInt(); - List enlistedPartitions = new ArrayList<>(length); + List enlistedPartitions = new ArrayList<>(length); for (int i = 0; i < length; i++) { - int tableId = in.readVarIntAsInt(); + int objectId = in.readVarIntAsInt(); int partitionId = in.readVarIntAsInt(); - enlistedPartitions.add(new TablePartitionId(tableId, partitionId)); + enlistedPartitions.add(replicationGroupId(objectId, partitionId)); } return enlistedPartitions; } + + private static ReplicationGroupId replicationGroupId(int objectId, int partitionId) { + if (ENABLED_COLOCATION_FEATURE) { + return new ZonePartitionId(objectId, partitionId); + } else { + return new TablePartitionId(objectId, partitionId); + } + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java index 26dc59cfba6..faa80e005f8 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java @@ -39,7 +39,7 @@ import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.placementdriver.ReplicaMeta; -import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.tx.TransactionException; /** @@ -74,14 +74,14 @@ public PlacementDriverHelper(PlacementDriver placementDriver, ClockService clock * @return Future that completes with node id that is a primary for the provided partition, or completes with exception if no primary * appeared during the await timeout. */ - public CompletableFuture awaitPrimaryReplicaWithExceptionHandling(TablePartitionId partitionId) { + public CompletableFuture awaitPrimaryReplicaWithExceptionHandling(ReplicationGroupId partitionId) { HybridTimestamp timestamp = clockService.now(); return awaitPrimaryReplicaWithExceptionHandling(partitionId, timestamp); } private CompletableFuture awaitPrimaryReplicaWithExceptionHandling( - TablePartitionId partitionId, + ReplicationGroupId partitionId, HybridTimestamp timestamp ) { return placementDriver.awaitPrimaryReplica(partitionId, timestamp, AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS) @@ -105,7 +105,7 @@ private CompletableFuture awaitPrimaryReplicaWithExceptionHandling( * @return A future that completes with a map of node to the partitions the node is primary for and a collection of partitions that we * failed to find the primary for. */ - public CompletableFuture findPrimaryReplicas(Collection partitions) { + public CompletableFuture findPrimaryReplicas(Collection partitions) { // Please note that we are using `get primary replica` instead of `await primary replica`. // This method is faster, yet we still have the correctness: // If the primary replica has not changed, get will return a valid value and we'll send an unlock request to this node. @@ -120,7 +120,7 @@ public CompletableFuture findPrimaryReplicas(Collection>> awaitPrimaryReplicas(Collection partitions) { + public CompletableFuture>> awaitPrimaryReplicas(Collection partitions) { return computePrimaryReplicas(partitions, this::awaitPrimaryReplicaWithExceptionHandling) .thenApply(partitionData -> partitionData.partitionsByNode); } @@ -133,8 +133,8 @@ public CompletableFuture>> awaitPrimaryReplica * failed to find the primary for. */ private CompletableFuture computePrimaryReplicas( - Collection partitions, - BiFunction> placementFunction + Collection partitions, + BiFunction> placementFunction ) { if (partitions == null || partitions.isEmpty()) { return completedFuture(new PartitionData(emptyMap(), emptySet())); @@ -142,23 +142,23 @@ private CompletableFuture computePrimaryReplicas( HybridTimestamp timestamp = clockService.now(); - Map> primaryReplicaFutures = new HashMap<>(); + Map> primaryReplicaFutures = new HashMap<>(); - for (TablePartitionId partitionId : partitions) { + for (ReplicationGroupId partitionId : partitions) { primaryReplicaFutures.put(partitionId, placementFunction.apply(partitionId, timestamp)); } return allOf(primaryReplicaFutures.values().toArray(new CompletableFuture[0])) .thenApply(v -> { - Map> partitionsByNode = new HashMap<>(); + Map> partitionsByNode = new HashMap<>(); - Set partitionsWithoutPrimary = new HashSet<>(); + Set partitionsWithoutPrimary = new HashSet<>(); - for (Entry> entry : primaryReplicaFutures.entrySet()) { + for (Entry> entry : primaryReplicaFutures.entrySet()) { // Safe to call join, the future has already finished. ReplicaMeta meta = entry.getValue().join(); - TablePartitionId partition = entry.getKey(); + ReplicationGroupId partition = entry.getKey(); if (meta != null && meta.getLeaseholder() != null) { partitionsByNode.computeIfAbsent(meta.getLeaseholder(), s -> new HashSet<>()) @@ -175,11 +175,11 @@ private CompletableFuture computePrimaryReplicas( * The result of retrieving primary replicas for a collection of partitions. */ public static class PartitionData { - final Map> partitionsByNode; + final Map> partitionsByNode; - final Set partitionsWithoutPrimary; + final Set partitionsWithoutPrimary; - PartitionData(Map> partitionsByNode, Set partitionsWithoutPrimary) { + PartitionData(Map> partitionsByNode, Set partitionsWithoutPrimary) { this.partitionsByNode = partitionsByNode; this.partitionsWithoutPrimary = partitionsWithoutPrimary; } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java index 73787429772..422e2987a1b 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java @@ -26,6 +26,7 @@ import java.util.function.Supplier; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.thread.PublicApiThreading; import org.apache.ignite.internal.tx.InternalTransaction; @@ -86,8 +87,8 @@ public UUID id() { } @Override - public IgniteBiTuple enlistedNodeAndConsistencyToken(TablePartitionId tablePartitionId) { - return transaction.enlistedNodeAndConsistencyToken(tablePartitionId); + public IgniteBiTuple enlistedNodeAndConsistencyToken(ReplicationGroupId replicationGroupId) { + return transaction.enlistedNodeAndConsistencyToken(replicationGroupId); } @Override @@ -106,9 +107,12 @@ public TablePartitionId commitPartition() { } @Override - public IgniteBiTuple enlist(TablePartitionId tablePartitionId, - IgniteBiTuple nodeAndConsistencyToken) { - return transaction.enlist(tablePartitionId, nodeAndConsistencyToken); + public IgniteBiTuple enlist( + ReplicationGroupId replicationGroupId, + int tableId, + IgniteBiTuple nodeAndConsistencyToken + ) { + return transaction.enlist(replicationGroupId, tableId, nodeAndConsistencyToken); } @Override diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java index 24dd0210d09..d7e7793eb00 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.hlc.HybridTimestampTracker; import org.apache.ignite.internal.lang.IgniteBiTuple; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.network.ClusterNode; @@ -86,14 +87,15 @@ public HybridTimestamp startTimestamp() { @Override public IgniteBiTuple enlist( - TablePartitionId tablePartitionId, + ReplicationGroupId replicationGroupId, + int tableId, IgniteBiTuple nodeAndConsistencyToken ) { return null; } @Override - public IgniteBiTuple enlistedNodeAndConsistencyToken(TablePartitionId tablePartitionId) { + public IgniteBiTuple enlistedNodeAndConsistencyToken(ReplicationGroupId replicationGroupId) { return null; } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java index 8e5cbc510e4..24d028f03d8 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java @@ -25,6 +25,7 @@ import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -33,6 +34,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.hlc.HybridTimestampTracker; import org.apache.ignite.internal.lang.IgniteBiTuple; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.tx.TransactionIds; import org.apache.ignite.internal.tx.TxManager; @@ -49,7 +51,9 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl { AtomicReferenceFieldUpdater.newUpdater(ReadWriteTransactionImpl.class, TablePartitionId.class, "commitPart"); /** Enlisted partitions: partition id -> (primary replica node, enlistment consistency token). */ - private final Map> enlisted = new ConcurrentHashMap<>(); + private final Map> enlisted = new ConcurrentHashMap<>(); + + private final Set enlistedTableIds = ConcurrentHashMap.newKeySet(); /** A partition which stores the transaction state. */ private volatile TablePartitionId commitPart; @@ -96,14 +100,15 @@ public TablePartitionId commitPartition() { /** {@inheritDoc} */ @Override - public IgniteBiTuple enlistedNodeAndConsistencyToken(TablePartitionId partGroupId) { + public IgniteBiTuple enlistedNodeAndConsistencyToken(ReplicationGroupId partGroupId) { return enlisted.get(partGroupId); } /** {@inheritDoc} */ @Override public IgniteBiTuple enlist( - TablePartitionId tablePartitionId, + ReplicationGroupId replicationGroupId, + int tableId, IgniteBiTuple nodeAndConsistencyToken ) { // No need to wait for lock if commit is in progress. @@ -115,7 +120,9 @@ public IgniteBiTuple enlist( try { checkEnlistPossibility(); - return enlisted.computeIfAbsent(tablePartitionId, k -> nodeAndConsistencyToken); + enlistedTableIds.add(tableId); + + return enlisted.computeIfAbsent(replicationGroupId, k -> nodeAndConsistencyToken); } finally { enlistPartitionLock.readLock().unlock(); } @@ -211,6 +218,7 @@ private CompletableFuture finishInternal( commitPart, commit, enlisted, + enlistedTableIds, id() ); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java index cd6f09c75e5..743b7e221c2 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException; import org.apache.ignite.internal.tx.TransactionResult; @@ -150,7 +151,7 @@ void markReadOnlyTxFinished(UUID txId) { }); } - ReadWriteTxContext lockTxForNewUpdates(UUID txId, Map> enlistedGroups) { + ReadWriteTxContext lockTxForNewUpdates(UUID txId, Map> enlistedGroups) { return (ReadWriteTxContext) txCtxMap.compute(txId, (uuid, tuple0) -> { if (tuple0 == null) { tuple0 = new ReadWriteTxContext(placementDriver, clockService); // No writes enlisted. @@ -186,7 +187,7 @@ void removeInflight(UUID txId) { abstract void onInflightsRemoved(); - abstract void finishTx(@Nullable Map> enlistedGroups); + abstract void finishTx(@Nullable Map> enlistedGroups); abstract boolean isTxFinishing(); @@ -209,7 +210,7 @@ public void onInflightsRemoved() { } @Override - public void finishTx(@Nullable Map> enlistedGroups) { + public void finishTx(@Nullable Map> enlistedGroups) { markedFinished = true; } @@ -233,8 +234,8 @@ static class ReadWriteTxContext extends TxContext { private final CompletableFuture waitRepFut = new CompletableFuture<>(); private final PlacementDriver placementDriver; private volatile CompletableFuture finishInProgressFuture = null; - private volatile Map> enlistedGroups; - private ClockService clockService; + private volatile Map> enlistedGroups; + private final ClockService clockService; private ReadWriteTxContext(PlacementDriver placementDriver, ClockService clockService) { this.placementDriver = placementDriver; @@ -286,7 +287,7 @@ private CompletableFuture waitReadyToFinish(boolean commit) { int cntr = 0; - for (Map.Entry> e : enlistedGroups.entrySet()) { + for (Map.Entry> e : enlistedGroups.entrySet()) { futures[cntr++] = placementDriver.getPrimaryReplica(e.getKey(), now) .thenApply(replicaMeta -> { Long enlistmentConsistencyToken = e.getValue().get2(); @@ -326,7 +327,7 @@ public void onInflightsRemoved() { } @Override - public void finishTx(Map> enlistedGroups) { + public void finishTx(Map> enlistedGroups) { this.enlistedGroups = enlistedGroups; finishInProgressFuture = new CompletableFuture<>(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java index 9ad33f464fb..0e8de4ee80f 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.tx.impl; import static java.util.concurrent.CompletableFuture.allOf; -import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage; +import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage; import java.util.ArrayList; import java.util.Collection; @@ -35,10 +35,10 @@ import org.apache.ignite.internal.network.ChannelType; import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.network.NetworkMessage; -import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; import org.apache.ignite.internal.replicator.message.ReplicaResponse; -import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo; import org.apache.ignite.internal.tx.message.CleanupReplicatedInfoMessage; @@ -46,6 +46,7 @@ import org.apache.ignite.internal.tx.message.TxMessageGroup; import org.apache.ignite.internal.tx.message.TxMessagesFactory; import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.Nullable; @@ -116,17 +117,17 @@ public void stop() { private void processTxCleanup(TxCleanupMessage txCleanupMessage, ClusterNode sender, @Nullable Long correlationId) { assert correlationId != null; - Map> writeIntentSwitches = new HashMap<>(); + Map> writeIntentSwitches = new HashMap<>(); // These cleanups will all be local. - List groups = txCleanupMessage.groups(); + List groups = txCleanupMessage.groups(); if (groups != null) { - Set groupSet = asTablePartitionIdSet(groups); + Set groupSet = asTablePartitionIdSet(groups); trackPartitions(txCleanupMessage.txId(), groupSet, sender); - for (TablePartitionId group : groupSet) { + for (ReplicationGroupId group : groupSet) { CompletableFuture future = writeIntentSwitchProcessor.switchLocalWriteIntents( group, txCleanupMessage.txId(), @@ -204,7 +205,7 @@ private NetworkMessage prepareErrorResponse(UUID txId, Throwable th) { * @param groups Replication groups. * @param sender Cleanup request sender, needed to send cleanup replicated response. */ - private void trackPartitions(UUID txId, Set groups, ClusterNode sender) { + private void trackPartitions(UUID txId, Set groups, ClusterNode sender) { writeIntentsReplicated.put(txId, new CleanupContext(sender, groups, groups)); } @@ -233,7 +234,7 @@ private void processWriteIntentSwitchResponse(ReplicaResponse response) { */ void writeIntentSwitchReplicated(WriteIntentSwitchReplicatedInfo info) { CleanupContext cleanupContext = writeIntentsReplicated.computeIfPresent(info.txId(), (uuid, context) -> { - Set partitions = new HashSet<>(context.partitions); + Set partitions = new HashSet<>(context.partitions); partitions.remove(info.partitionId()); return new CleanupContext(context.sender, partitions, context.initialPartitions); @@ -254,18 +255,18 @@ void writeIntentSwitchReplicated(WriteIntentSwitchReplicatedInfo info) { * @param sender Cleanup request sender. * @param partitions Partitions that we received replication confirmation for. */ - private void sendCleanupReplicatedResponse(UUID txId, ClusterNode sender, Collection partitions) { + private void sendCleanupReplicatedResponse(UUID txId, ClusterNode sender, Collection partitions) { messagingService.send(sender, ChannelType.DEFAULT, prepareResponse(new CleanupReplicatedInfo(txId, partitions))); } private static class CleanupContext { private final ClusterNode sender; - private final Set partitions; + private final Set partitions; - private final Set initialPartitions; + private final Set initialPartitions; - public CleanupContext(ClusterNode sender, Set partitions, Set initialPartitions) { + public CleanupContext(ClusterNode sender, Set partitions, Set initialPartitions) { this.sender = sender; this.partitions = partitions; this.initialPartitions = initialPartitions; @@ -273,11 +274,11 @@ public CleanupContext(ClusterNode sender, Set partitions, Set< } private static CleanupReplicatedInfoMessage toCleanupReplicatedInfoMessage(CleanupReplicatedInfo info) { - Collection partitions = info.partitions(); - List partitionMessages = new ArrayList<>(partitions.size()); + Collection partitions = info.partitions(); + List partitionMessages = new ArrayList<>(partitions.size()); - for (TablePartitionId partition : partitions) { - partitionMessages.add(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, partition)); + for (ReplicationGroupId partition : partitions) { + partitionMessages.add(toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, partition)); } return TX_MESSAGES_FACTORY.cleanupReplicatedInfoMessage() @@ -286,11 +287,11 @@ private static CleanupReplicatedInfoMessage toCleanupReplicatedInfoMessage(Clean .build(); } - private static Set asTablePartitionIdSet(List messages) { - var set = new HashSet(messages.size()); + private static Set asTablePartitionIdSet(List messages) { + var set = new HashSet(IgniteUtils.capacity(messages.size())); - for (int i = 0; i < messages.size(); i++) { - set.add(messages.get(i).asTablePartitionId()); + for (ReplicationGroupIdMessage message : messages) { + set.add(message.asReplicationGroupId()); } return set; diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java index b994af65d7c..12ad076d3b4 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java @@ -154,7 +154,7 @@ public CompletableFuture cleanup(TablePartitionId commitPartitionId, Strin */ public CompletableFuture cleanup( ReplicationGroupId commitPartitionId, - Map enlistedPartitions, + Map enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId @@ -165,7 +165,7 @@ public CompletableFuture cleanup( new CleanupContext(commitPartitionId, enlistedPartitions.keySet(), commit ? TxState.COMMITTED : TxState.ABORTED) ); - Map> partitions = new HashMap<>(); + Map> partitions = new HashMap<>(); enlistedPartitions.forEach((partitionId, nodeId) -> partitions.computeIfAbsent(nodeId, node -> new HashSet<>()).add(partitionId)); @@ -184,7 +184,7 @@ public CompletableFuture cleanup( */ public CompletableFuture cleanup( ReplicationGroupId commitPartitionId, - Collection partitionIds, + Collection partitionIds, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId @@ -197,8 +197,13 @@ public CompletableFuture cleanup( return placementDriverHelper.findPrimaryReplicas(partitionIds) .thenCompose(partitionData -> { - cleanupPartitionsWithoutPrimary(commitPartitionId, commit, commitTimestamp, txId, - partitionData.partitionsWithoutPrimary); + cleanupPartitionsWithoutPrimary( + commitPartitionId, + commit, + commitTimestamp, + txId, + partitionData.partitionsWithoutPrimary + ); return cleanupPartitions(commitPartitionId, partitionData.partitionsByNode, commit, commitTimestamp, txId); }); @@ -209,7 +214,7 @@ private void cleanupPartitionsWithoutPrimary( boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId, - Set noPrimaryFound + Set noPrimaryFound ) { // For the partitions without primary, we need to wait until a new primary is found. // Then we can proceed with the common cleanup flow. @@ -219,16 +224,16 @@ private void cleanupPartitionsWithoutPrimary( private CompletableFuture cleanupPartitions( ReplicationGroupId commitPartitionId, - Map> partitionsByNode, + Map> partitionsByNode, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId ) { List> cleanupFutures = new ArrayList<>(); - for (Entry> entry : partitionsByNode.entrySet()) { + for (Entry> entry : partitionsByNode.entrySet()) { String node = entry.getKey(); - Set nodePartitions = entry.getValue(); + Set nodePartitions = entry.getValue(); cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit, commitTimestamp, txId, node, nodePartitions)); } @@ -242,7 +247,7 @@ private CompletableFuture sendCleanupMessageWithRetries( @Nullable HybridTimestamp commitTimestamp, UUID txId, String node, - @Nullable Collection partitions + @Nullable Collection partitions ) { return txMessageSender.cleanup(node, partitions, txId, commit, commitTimestamp) .handle((networkMessage, throwable) -> { @@ -280,14 +285,14 @@ private static class CleanupContext { /** * The partitions the we have not received write intent replication confirmation for. */ - private final Set partitions; + private final Set partitions; /** * The state of the transaction. */ private final TxState txState; - private CleanupContext(ReplicationGroupId commitPartitionId, Set partitions, TxState txState) { + private CleanupContext(ReplicationGroupId commitPartitionId, Set partitions, TxState txState) { this.commitPartitionId = commitPartitionId; this.partitions = partitions; this.txState = txState; diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index 2f92e73b46a..c65d26ede8a 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -579,7 +579,8 @@ public CompletableFuture finish( HybridTimestampTracker observableTimestampTracker, TablePartitionId commitPartition, boolean commitIntent, - Map> enlistedGroups, + Map> enlistedGroups, + Set enlistedTableIds, UUID txId ) { LOG.debug("Finish [commit={}, txId={}, groups={}].", commitIntent, txId, enlistedGroups); @@ -641,6 +642,7 @@ public CompletableFuture finish( commitPartition, commit, enlistedGroups, + enlistedTableIds, txId, finishingStateMeta.txFinishFuture() ) @@ -666,7 +668,8 @@ private CompletableFuture prepareFinish( HybridTimestampTracker observableTimestampTracker, TablePartitionId commitPartition, boolean commit, - Map> enlistedGroups, + Map> enlistedGroups, + Set enlistedTableIds, UUID txId, CompletableFuture txFinishFuture ) { @@ -681,7 +684,7 @@ private CompletableFuture prepareFinish( (unused, throwable) -> { boolean verifiedCommit = throwable == null && commit; - Map replicationGroupIds = enlistedGroups.entrySet().stream() + Map replicationGroupIds = enlistedGroups.entrySet().stream() .collect(Collectors.toMap( Entry::getKey, entry -> entry.getValue().get1().name() @@ -692,6 +695,7 @@ private CompletableFuture prepareFinish( commitPartition, verifiedCommit, replicationGroupIds, + enlistedTableIds, txId, commitTimestamp, txFinishFuture); @@ -708,7 +712,8 @@ private CompletableFuture durableFinish( HybridTimestampTracker observableTimestampTracker, TablePartitionId commitPartition, boolean commit, - Map replicationGroupIds, + Map replicationGroupIds, + Set enlistedTableIds, UUID txId, HybridTimestamp commitTimestamp, CompletableFuture txFinishFuture @@ -722,6 +727,7 @@ private CompletableFuture durableFinish( meta.getStartTime().longValue(), commit, replicationGroupIds, + enlistedTableIds, txId, commitTimestamp, txFinishFuture @@ -761,6 +767,7 @@ private CompletableFuture durableFinish( commitPartition, commit, replicationGroupIds, + enlistedTableIds, txId, commitTimestamp, txFinishFuture @@ -783,18 +790,20 @@ private CompletableFuture makeFinishRequest( String primaryConsistentId, Long enlistmentConsistencyToken, boolean commit, - Map replicationGroupIds, + Map replicationGroupIds, + Set enlistedTableIds, UUID txId, HybridTimestamp commitTimestamp, CompletableFuture txFinishFuture ) { - LOG.debug("Finish [partition={}, node={}, enlistmentConsistencyToken={} commit={}, txId={}, groups={}", - commitPartition, primaryConsistentId, enlistmentConsistencyToken, commit, txId, replicationGroupIds); + LOG.debug("Finish [partition={}, node={}, enlistmentConsistencyToken={} commit={}, txId={}, groups={}, tableIds={}", + commitPartition, primaryConsistentId, enlistmentConsistencyToken, commit, txId, replicationGroupIds, enlistedTableIds); return txMessageSender.finish( primaryConsistentId, commitPartition, replicationGroupIds, + enlistedTableIds, txId, enlistmentConsistencyToken, commit, @@ -941,7 +950,7 @@ public LockManager lockManager() { @Override public CompletableFuture cleanup( ReplicationGroupId commitPartitionId, - Map enlistedPartitions, + Map enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId @@ -952,7 +961,7 @@ public CompletableFuture cleanup( @Override public CompletableFuture cleanup( TablePartitionId commitPartitionId, - Collection enlistedPartitions, + Collection enlistedPartitions, boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId @@ -1040,14 +1049,14 @@ public void onReceived(NetworkMessage message, ClusterNode sender, @Nullable Lon * @return Verification future. */ private CompletableFuture verifyCommitTimestamp( - Map> enlistedGroups, + Map> enlistedGroups, HybridTimestamp commitTimestamp ) { var verificationFutures = new CompletableFuture[enlistedGroups.size()]; int cnt = -1; - for (Map.Entry> enlistedGroup : enlistedGroups.entrySet()) { - TablePartitionId groupId = enlistedGroup.getKey(); + for (Map.Entry> enlistedGroup : enlistedGroups.entrySet()) { + ReplicationGroupId groupId = enlistedGroup.getKey(); Long expectedEnlistmentConsistencyToken = enlistedGroup.getValue().get2(); verificationFutures[++cnt] = placementDriver.getPrimaryReplica(groupId, commitTimestamp) diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java index b1985a6a36a..b0d1a372025 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.tx.impl; +import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage; import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage; import java.util.ArrayList; @@ -24,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.hlc.ClockService; @@ -31,9 +33,11 @@ import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; import org.apache.ignite.internal.replicator.message.ReplicaResponse; +import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; import org.apache.ignite.internal.tx.TransactionMeta; import org.apache.ignite.internal.tx.TransactionResult; @@ -86,7 +90,7 @@ public TxMessageSender( * Sends WriteIntentSwitch request to the specified primary replica. * * @param primaryConsistentId Primary replica to process given cleanup request. - * @param tablePartitionId Table partition id. + * @param replicationGroupId Replication group (partition) id. * @param txId Transaction id. * @param commit {@code True} if a commit requested. * @param commitTimestamp Commit timestamp ({@code null} if it's an abort). @@ -94,7 +98,7 @@ public TxMessageSender( */ public CompletableFuture switchWriteIntents( String primaryConsistentId, - TablePartitionId tablePartitionId, + ReplicationGroupId replicationGroupId, UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp @@ -102,7 +106,7 @@ public CompletableFuture switchWriteIntents( return replicaService.invoke( primaryConsistentId, TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest() - .groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartitionId)) + .groupId(toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, replicationGroupId)) .timestamp(clockService.now()) .txId(txId) .commit(commit) @@ -115,7 +119,7 @@ public CompletableFuture switchWriteIntents( * Sends cleanup request to the specified primary replica. * * @param primaryConsistentId Primary replica to process given cleanup request. - * @param replicationGroupIds Table partition IDs. + * @param replicationGroupIds Replication group IDs. * @param txId Transaction id. * @param commit {@code True} if a commit requested. * @param commitTimestamp Commit timestamp ({@code null} if it's an abort). @@ -123,7 +127,7 @@ public CompletableFuture switchWriteIntents( */ public CompletableFuture cleanup( String primaryConsistentId, - @Nullable Collection replicationGroupIds, + @Nullable Collection replicationGroupIds, UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp @@ -135,7 +139,7 @@ public CompletableFuture cleanup( .commit(commit) .commitTimestamp(commitTimestamp) .timestamp(clockService.now()) - .groups(toTablePartitionIdMessages(replicationGroupIds)) + .groups(toReplicationGroupIdMessages(replicationGroupIds)) .build(), transactionConfiguration.rpcTimeout().value()); } @@ -146,6 +150,7 @@ public CompletableFuture cleanup( * @param primaryConsistentId Node consistent id to send the request to. * @param commitPartition Partition to store a transaction state. * @param replicationGroupIds Enlisted partition groups. + * @param enlistedTableIds IDs of tables taking part in the transaction. * @param txId Transaction id. * @param consistencyToken Enlistment consistency token. * @param commit {@code true} if a commit requested. @@ -155,7 +160,8 @@ public CompletableFuture cleanup( public CompletableFuture finish( String primaryConsistentId, TablePartitionId commitPartition, - Map replicationGroupIds, + Map replicationGroupIds, + Set enlistedTableIds, UUID txId, Long consistencyToken, boolean commit, @@ -173,11 +179,13 @@ public CompletableFuture finish( .commitPartitionId(commitPartitionIdMessage) .timestamp(clockService.now()) .groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartition)) - .groups(toTablePartitionIdMessages(replicationGroupIds)) + .groups(toReplicationGroupIdMessages(replicationGroupIds)) + .tableIds(enlistedTableIds) .commit(commit) .commitTimestamp(commitTimestamp) .enlistmentConsistencyToken(consistencyToken) - .build()); + .build() + ); } /** @@ -251,29 +259,29 @@ public MessagingService messagingService() { return messagingService; } - private static @Nullable List toTablePartitionIdMessages( - @Nullable Collection tablePartitionIds + private static @Nullable List toReplicationGroupIdMessages( + @Nullable Collection replicationGroupIds ) { - if (tablePartitionIds == null) { + if (replicationGroupIds == null) { return null; } - var messages = new ArrayList(tablePartitionIds.size()); + var messages = new ArrayList(replicationGroupIds.size()); - for (TablePartitionId tablePartitionId : tablePartitionIds) { - messages.add(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartitionId)); + for (ReplicationGroupId tablePartitionId : replicationGroupIds) { + messages.add(toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, tablePartitionId)); } return messages; } - private static Map toTablePartitionIdMessages( - Map replicationGroupIds + private static Map toReplicationGroupIdMessages( + Map replicationGroupIds ) { - var messages = new HashMap(replicationGroupIds.size()); + var messages = new HashMap(replicationGroupIds.size()); - for (Map.Entry e : replicationGroupIds.entrySet()) { - messages.put(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, e.getKey()), e.getValue()); + for (Map.Entry e : replicationGroupIds.entrySet()) { + messages.put(toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, e.getKey()), e.getValue()); } return messages; diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java index 6538ba7cb54..3396ce213b3 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java @@ -24,7 +24,7 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.network.TopologyService; -import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.message.ReplicaResponse; import org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler; import org.apache.ignite.internal.util.CompletableFutures; @@ -67,14 +67,14 @@ public WriteIntentSwitchProcessor( * Run switch write intent on the provided node. */ public CompletableFuture switchLocalWriteIntents( - TablePartitionId tablePartitionId, + ReplicationGroupId replicationGroupId, UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp ) { String localNodeName = topologyService.localMember().name(); - return txMessageSender.switchWriteIntents(localNodeName, tablePartitionId, txId, commit, commitTimestamp); + return txMessageSender.switchWriteIntents(localNodeName, replicationGroupId, txId, commit, commitTimestamp); } /** @@ -84,7 +84,7 @@ public CompletableFuture switchWriteIntentsWithRetry( boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId, - TablePartitionId partitionId + ReplicationGroupId partitionId ) { return placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(partitionId) .thenCompose(leaseHolder -> diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfo.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfo.java index 060e40af38c..5cd2e6c0b23 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfo.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfo.java @@ -20,7 +20,7 @@ import java.io.Serializable; import java.util.Collection; import java.util.UUID; -import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.ReplicationGroupId; /** * The result of a replicated cleanup request. @@ -31,9 +31,9 @@ public class CleanupReplicatedInfo implements Serializable { private final UUID txId; - private final Collection partitions; + private final Collection partitions; - public CleanupReplicatedInfo(UUID txId, Collection partitions) { + public CleanupReplicatedInfo(UUID txId, Collection partitions) { this.txId = txId; this.partitions = partitions; } @@ -42,7 +42,7 @@ public UUID txId() { return txId; } - public Collection partitions() { + public Collection partitions() { return partitions; } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfoMessage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfoMessage.java index d3984f2a8c7..76025e233c1 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfoMessage.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfoMessage.java @@ -22,8 +22,8 @@ import java.util.UUID; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.replicator.TablePartitionId; -import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; /** Message for transferring a {@link CleanupReplicatedInfo}. */ @Transferable(TxMessageGroup.CLEANUP_REPLICATED_INFO_MESSAGE) @@ -32,15 +32,15 @@ public interface CleanupReplicatedInfoMessage extends NetworkMessage { UUID txId(); /** Partitions. */ - List partitions(); + List partitions(); /** Converts to {@link CleanupReplicatedInfo}. */ default CleanupReplicatedInfo asCleanupReplicatedInfo() { - List partitionMessages = partitions(); - List partitions = new ArrayList<>(partitionMessages.size()); + List partitionMessages = partitions(); + List partitions = new ArrayList<>(partitionMessages.size()); - for (int i = 0; i < partitionMessages.size(); i++) { - partitions.add(partitionMessages.get(i).asTablePartitionId()); + for (ReplicationGroupIdMessage partitionMessage : partitionMessages) { + partitions.add(partitionMessage.asReplicationGroupId()); } return new CleanupReplicatedInfo(txId(), partitions); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java index b247285483c..13be38c3299 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java @@ -21,7 +21,7 @@ import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; import org.apache.ignite.internal.replicator.message.TimestampAware; import org.jetbrains.annotations.Nullable; @@ -43,7 +43,7 @@ public interface TxCleanupMessage extends TimestampAware { * * @return Replication groups aggregated by expected primary replica nodes. */ - @Nullable List groups(); + @Nullable List groups(); /** * Returns {@code True} if a commit request. diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java index d73c81eb56d..87ab773eb3d 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java @@ -18,10 +18,12 @@ package org.apache.ignite.internal.tx.message; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.annotations.Transferable; import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest; +import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; import org.apache.ignite.internal.replicator.message.TimestampAware; import org.jetbrains.annotations.Nullable; @@ -62,5 +64,8 @@ public interface TxFinishReplicaRequest extends PrimaryReplicaRequest, Timestamp @Nullable HybridTimestamp commitTimestamp(); /** Enlisted partition groups aggregated by expected primary replica nodes. */ - Map groups(); + Map groups(); + + /** IDs of tables enlisted in the transaction. */ + Set tableIds(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMetaMessage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMetaMessage.java index fc9f3a333ba..56551688e3c 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMetaMessage.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMetaMessage.java @@ -20,8 +20,8 @@ import java.util.ArrayList; import java.util.List; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.replicator.TablePartitionId; -import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; import org.apache.ignite.internal.tx.TransactionMeta; import org.apache.ignite.internal.tx.TxMeta; @@ -29,15 +29,15 @@ @Transferable(TxMessageGroup.TX_META_MESSAGE) public interface TxMetaMessage extends TransactionMetaMessage { /** List of enlisted partition groups. */ - List enlistedPartitions(); + List enlistedPartitions(); /** Converts to {@link TxMeta}. */ default TxMeta asTxMeta() { - List enlistedPartitionMessages = enlistedPartitions(); - var enlistedPartitions = new ArrayList(enlistedPartitionMessages.size()); + List enlistedPartitionMessages = enlistedPartitions(); + var enlistedPartitions = new ArrayList(enlistedPartitionMessages.size()); for (int i = 0; i < enlistedPartitionMessages.size(); i++) { - enlistedPartitions.add(enlistedPartitionMessages.get(i).asTablePartitionId()); + enlistedPartitions.add(enlistedPartitionMessages.get(i).asReplicationGroupId()); } return new TxMeta(txState(), enlistedPartitions, commitTimestamp()); diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java index f85a9442670..8140b96364e 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl; import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; @@ -137,7 +138,7 @@ void testCleanupAllNodes() { TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0); TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0); - Map partitions = Map.of( + Map partitions = Map.of( tablePartitionId1, LOCAL_NODE.name(), tablePartitionId2, LOCAL_NODE.name(), tablePartitionId3, LOCAL_NODE.name()); @@ -161,7 +162,7 @@ void testPrimaryNotFoundForSomeAfterException() { TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0); TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0); - Map partitions = Map.of( + Map partitions = Map.of( tablePartitionId1, LOCAL_NODE.name(), tablePartitionId2, LOCAL_NODE.name(), tablePartitionId3, LOCAL_NODE.name()); @@ -197,7 +198,7 @@ void testPrimaryNotFoundForAll() { TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0); TablePartitionId tablePartitionId3 = new TablePartitionId(3, 0); - Map partitions = Map.of( + Map partitions = Map.of( tablePartitionId1, LOCAL_NODE.name(), tablePartitionId2, LOCAL_NODE.name(), tablePartitionId3, LOCAL_NODE.name()); diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java index 2cc40815b31..014c4489de6 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java @@ -78,7 +78,9 @@ import org.apache.ignite.internal.placementdriver.ReplicaMeta; import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl; import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException; import org.apache.ignite.internal.testframework.ExecutorServiceExtension; import org.apache.ignite.internal.testframework.IgniteAbstractTest; @@ -228,11 +230,11 @@ public void testEnlist() { InternalTransaction tx = txManager.beginExplicitRw(hybridTimestampTracker, InternalTxOptions.defaults()); - TablePartitionId tablePartitionId = new TablePartitionId(1, 0); + ReplicationGroupId partitionIdForEnlistment = new ZonePartitionId(1, 0); - tx.enlist(tablePartitionId, new IgniteBiTuple<>(REMOTE_NODE, 1L)); + tx.enlist(partitionIdForEnlistment, 10, new IgniteBiTuple<>(REMOTE_NODE, 1L)); - assertEquals(new IgniteBiTuple<>(REMOTE_NODE, 1L), tx.enlistedNodeAndConsistencyToken(tablePartitionId)); + assertEquals(new IgniteBiTuple<>(REMOTE_NODE, 1L), tx.enlistedNodeAndConsistencyToken(partitionIdForEnlistment)); } @Test @@ -310,7 +312,7 @@ public void testRepeatedCommitRollbackAfterCommit() throws Exception { TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); - tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L)); + tx.enlist(tablePartitionId1, tablePartitionId1.tableId(), new IgniteBiTuple<>(REMOTE_NODE, 1L)); tx.assignCommitPartition(tablePartitionId1); tx.commit(); @@ -331,7 +333,7 @@ public void testRepeatedCommitRollbackAfterRollback() throws Exception { TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); - tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L)); + tx.enlist(tablePartitionId1, tablePartitionId1.tableId(), new IgniteBiTuple<>(REMOTE_NODE, 1L)); tx.assignCommitPartition(tablePartitionId1); tx.rollback(); @@ -359,7 +361,7 @@ void testRepeatedCommitRollbackAfterCommitWithException() throws Exception { TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); - tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L)); + tx.enlist(tablePartitionId1, tablePartitionId1.tableId(), new IgniteBiTuple<>(REMOTE_NODE, 1L)); tx.assignCommitPartition(tablePartitionId1); TransactionException transactionException = assertThrows(TransactionException.class, tx::commit); @@ -387,7 +389,7 @@ public void testRepeatedCommitRollbackAfterRollbackWithException() throws Except TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); - tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L)); + tx.enlist(tablePartitionId1, tablePartitionId1.tableId(), new IgniteBiTuple<>(REMOTE_NODE, 1L)); tx.assignCommitPartition(tablePartitionId1); TransactionException transactionException = assertThrows(TransactionException.class, tx::rollback); @@ -627,11 +629,11 @@ public void testOnlyPrimaryExpirationAffectsTransaction() { ClusterNode node = mock(ClusterNode.class); TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); - tx.enlist(tablePartitionId1, new IgniteBiTuple<>(node, 1L)); + tx.enlist(tablePartitionId1, tablePartitionId1.tableId(), new IgniteBiTuple<>(node, 1L)); tx.assignCommitPartition(tablePartitionId1); - TablePartitionId tablePartitionId2 = new TablePartitionId(2, 0); - tx.enlist(tablePartitionId2, new IgniteBiTuple<>(node, 1L)); + ReplicationGroupId partitionIdForEnlistment2 = new TablePartitionId(2, 0); + tx.enlist(partitionIdForEnlistment2, 20, new IgniteBiTuple<>(node, 1L)); when(placementDriver.getPrimaryReplica(eq(tablePartitionId1), any())) .thenReturn(completedFuture( @@ -640,9 +642,9 @@ public void testOnlyPrimaryExpirationAffectsTransaction() { .thenReturn(completedFuture( new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), HybridTimestamp.MAX_VALUE))); - lenient().when(placementDriver.getPrimaryReplica(eq(tablePartitionId2), any())) + lenient().when(placementDriver.getPrimaryReplica(eq(partitionIdForEnlistment2), any())) .thenReturn(nullCompletedFuture()); - lenient().when(placementDriver.awaitPrimaryReplica(eq(tablePartitionId2), any(), anyLong(), any())) + lenient().when(placementDriver.awaitPrimaryReplica(eq(partitionIdForEnlistment2), any(), anyLong(), any())) .thenReturn(completedFuture( new TestReplicaMetaImpl(LOCAL_NODE, hybridTimestamp(1), hybridTimestamp(10)))); @@ -768,7 +770,7 @@ private InternalTransaction prepareTransaction() { TablePartitionId tablePartitionId1 = new TablePartitionId(1, 0); - tx.enlist(tablePartitionId1, new IgniteBiTuple<>(REMOTE_NODE, 1L)); + tx.enlist(tablePartitionId1, tablePartitionId1.tableId(), new IgniteBiTuple<>(REMOTE_NODE, 1L)); tx.assignCommitPartition(tablePartitionId1); return tx; diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java index 57f836f9d62..dbc963e1f76 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.network.ClusterNodeImpl; import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.TxState; @@ -63,6 +64,7 @@ class ReadWriteTransactionImplTest extends BaseIgniteAbstractTest { private static final IgniteBiTuple NODE_AND_TOKEN = new IgniteBiTuple(CLUSTER_NODE, 0L); private static final int TABLE_ID = 1; + private static final int ZONE_ID = 2; /** Transaction commit partition id. */ public static final TablePartitionId TX_COMMIT_PART = new TablePartitionId(TABLE_ID, 0); @@ -96,8 +98,8 @@ public void effectiveSchemaTimestampIsBeginTimestamp() { private void startTxAndTryToEnlist(boolean commit) { HashSet finishedTxs = new HashSet<>(); - Mockito.when(txManager.finish(any(), any(), anyBoolean(), any(), any())).thenAnswer(invocation -> { - finishedTxs.add(invocation.getArgument(4)); + Mockito.when(txManager.finish(any(), any(), anyBoolean(), any(), any(), any())).thenAnswer(invocation -> { + finishedTxs.add(invocation.getArgument(5)); return nullCompletedFuture(); }); @@ -120,8 +122,8 @@ private void startTxAndTryToEnlist(boolean commit) { tx.assignCommitPartition(TX_COMMIT_PART); - tx.enlist(new TablePartitionId(TABLE_ID, 0), NODE_AND_TOKEN); - tx.enlist(new TablePartitionId(TABLE_ID, 2), NODE_AND_TOKEN); + tx.enlist(new ZonePartitionId(ZONE_ID, 0), TABLE_ID, NODE_AND_TOKEN); + tx.enlist(new ZonePartitionId(ZONE_ID, 2), TABLE_ID, NODE_AND_TOKEN); if (commit) { if (txState == null) { @@ -138,11 +140,11 @@ private void startTxAndTryToEnlist(boolean commit) { } TransactionException ex = assertThrows(TransactionException.class, - () -> tx.enlist(new TablePartitionId(TABLE_ID, 5), NODE_AND_TOKEN)); + () -> tx.enlist(new ZonePartitionId(ZONE_ID, 5), TABLE_ID, NODE_AND_TOKEN)); assertTrue(ex.getMessage().contains(txState.toString())); - ex = assertThrows(TransactionException.class, () -> tx.enlist(new TablePartitionId(TABLE_ID, 0), NODE_AND_TOKEN)); + ex = assertThrows(TransactionException.class, () -> tx.enlist(new ZonePartitionId(ZONE_ID, 0), TABLE_ID, NODE_AND_TOKEN)); assertTrue(ex.getMessage().contains(txState.toString())); } diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java index da17b7a9c43..04bb6f4e9f4 100644 --- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java +++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.tx.TxMeta; @@ -148,7 +149,7 @@ private void testPutGetRemove0(BiConsumer> re } } - private List generateEnlistedPartitions(int c) { + private List generateEnlistedPartitions(int c) { return IntStream.range(0, c) .mapToObj(partitionNumber -> new TablePartitionId(TABLE_ID, partitionNumber)) .collect(toList());