diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java index 100f93e3bfe..ae1ed0cc9ea 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java @@ -169,6 +169,11 @@ public boolean isFinishingOrFinished() { public long timeout() { return 10_000; } + + @Override + public CompletableFuture kill() { + return nullCompletedFuture(); + } }; } @@ -235,6 +240,11 @@ public CompletableFuture vacuum() { return nullCompletedFuture(); } + @Override + public CompletableFuture kill(UUID txId) { + return nullCompletedFuture(); + } + @Override public int finished() { return 0; diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/transaction/ItTransactionControllerTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/transaction/ItTransactionControllerTest.java index 0c0864adecf..005565a6d42 100644 --- a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/transaction/ItTransactionControllerTest.java +++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/transaction/ItTransactionControllerTest.java @@ -26,7 +26,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; import io.micronaut.core.type.Argument; import io.micronaut.http.HttpRequest; @@ -76,7 +75,7 @@ void shouldReturnAllTransactions() { assertThat(transactionInfo, notNullValue()); assertThat(transactionInfo.type(), is("READ_ONLY")); - assertThat(transactionInfo.state(), nullValue()); + assertThat(transactionInfo.state(), is("PENDING")); assertThat(transactionInfo.priority(), is("NORMAL")); roTx.rollback(); @@ -103,7 +102,7 @@ void shouldReturnTransactionById() { { assertThat(roTransactionInfo, notNullValue()); assertThat(roTransactionInfo.type(), is("READ_ONLY")); - assertThat(roTransactionInfo.state(), nullValue()); + assertThat(roTransactionInfo.state(), is("PENDING")); assertThat(roTransactionInfo.priority(), is("NORMAL")); roTx.rollback(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java index 65d625e5592..d7e991aa561 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.schema.row.Row; import org.apache.ignite.internal.schema.row.RowAssembler; import org.apache.ignite.internal.testframework.IgniteTestUtils; +import org.apache.ignite.internal.tx.TxStateMeta; import org.apache.ignite.internal.tx.impl.TxManagerImpl; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.table.Tuple; @@ -108,7 +109,13 @@ public void testImplicit() { ignite.tables().table(TABLE_NAME).keyValueView().get(null, Tuple.create().set("id", 12)); ignite.tables().table(TABLE_NAME).keyValueView().getAll(null, Set.of(Tuple.create().set("id", 12))); - int txRwStatesAfter = txManager.states().size(); + int txRwStatesAfter = 0; + + for (TxStateMeta stateMeta : txManager.states()) { + if (stateMeta.tx() == null || !stateMeta.tx().isReadOnly()) { + txRwStatesAfter++; + } + } int txFinishedAfter = txManager.finished(); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java index 4e9f4d5c1b8..68b11f78595 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java @@ -192,6 +192,11 @@ public IgniteBiTuple enlist(TablePartitionId tablePartitionId return nodeAndConsistencyToken; } + @Override + public CompletableFuture kill() { + return rollbackAsync(); + } + /** Returns a {@link CompletableFuture} that completes when this transaction commits. */ public CompletableFuture commitFuture() { return commitFut; diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java index 507622bdef4..b775743c679 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java @@ -145,13 +145,13 @@ private void testTransaction(Consumer touchOp, boolean checkAfterTo ReadWriteTransactionImpl tx = (ReadWriteTransactionImpl) testCluster.igniteTransactions().begin(); - checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null), List.of(0)); + checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null, null), List.of(0)); checkLocalTxStateOnNodes(tx.id(), null, IntStream.range(1, NODES).boxed().collect(toList())); touchOp.accept(tx); if (checkAfterTouch) { - checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null)); + checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null, null)); } if (commit) { @@ -166,7 +166,8 @@ private void testTransaction(Consumer touchOp, boolean checkAfterTo commit ? COMMITTED : ABORTED, coordinatorId, tx.commitPartition(), - commit ? testCluster.clockServices.get(coord.name()).now() : null + commit ? testCluster.clockServices.get(coord.name()).now() : null, + null ) ); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 833ac282f7b..7fb347d0558 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -762,7 +762,8 @@ private void replicaTouch(UUID txId, UUID txCoordinatorId, HybridTimestamp commi full ? COMMITTED : PENDING, txCoordinatorId, old == null ? null : old.commitPartitionId(), - full ? commitTimestamp : null + full ? commitTimestamp : null, + old == null ? null : old.tx() )); } @@ -772,6 +773,7 @@ private void markFinished(UUID txId, boolean commit, @Nullable HybridTimestamp c old == null ? null : old.txCoordinatorId(), old == null ? partId : old.commitPartitionId(), commit ? commitTimestamp : null, + old == null ? null : old.tx(), old == null ? null : old.initialVacuumObservationTimestamp(), old == null ? null : old.cleanupCompletionTimestamp() )); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 1bdec1e4226..4e72c1a59ab 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -516,7 +516,8 @@ private CompletableFuture processRequest(ReplicaRequest request, @Nullable Bo PENDING, req.coordinatorId(), req.commitPartitionId().asTablePartitionId(), - null + null, + old == null ? null : old.tx() )); } } @@ -811,7 +812,8 @@ private CompletableFuture processOperationRequest( PENDING, req.coordinatorId(), req.commitPartitionId().asTablePartitionId(), - null + null, + old == null ? null : old.tx() )); var opId = new OperationId(senderId, req.timestamp().longValue()); @@ -3945,6 +3947,7 @@ private void markFinished(UUID txId, TxState txState, @Nullable HybridTimestamp old == null ? null : old.txCoordinatorId(), old == null ? null : old.commitPartitionId(), txState == COMMITTED ? commitTimestamp : null, + old == null ? null : old.tx(), old == null ? null : old.initialVacuumObservationTimestamp(), old == null ? null : old.cleanupCompletionTimestamp() )); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 142902a73aa..7b8f67e32d8 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -730,7 +730,13 @@ public void testTxStateReplicaRequestEmptyState() throws Exception { doAnswer(invocation -> { UUID txId = invocation.getArgument(4); - txManager.updateTxMeta(txId, old -> new TxStateMeta(ABORTED, localNode.id(), commitPartitionId, null)); + txManager.updateTxMeta(txId, old -> new TxStateMeta( + ABORTED, + localNode.id(), + commitPartitionId, + null, + null + )); return nullCompletedFuture(); }).when(txManager).finish(any(), any(), anyBoolean(), any(), any()); @@ -776,7 +782,7 @@ void testExecuteRequestOnFinishedTx(TxState txState, RequestType requestType) { UUID txId = newTxId(); txStateStorage.putForRebalance(txId, new TxMeta(txState, singletonList(grpId), null)); - txManager.updateTxMeta(txId, old -> new TxStateMeta(txState, null, null, null)); + txManager.updateTxMeta(txId, old -> new TxStateMeta(txState, null, null, null, null)); BinaryRow testRow = binaryRow(0); @@ -911,7 +917,7 @@ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentCommitted() thr pkStorage().put(testBinaryRow, rowId); testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID); - txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, localNode.id(), commitPartitionId, clock.now())); + txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, localNode.id(), commitPartitionId, clock.now(), null)); CompletableFuture fut = doReadOnlySingleGet(testBinaryKey); @@ -929,7 +935,7 @@ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentPending() throw pkStorage().put(testBinaryRow, rowId); testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID); - txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, localNode.id(), commitPartitionId, null)); + txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, localNode.id(), commitPartitionId, null, null)); CompletableFuture fut = doReadOnlySingleGet(testBinaryKey); @@ -948,7 +954,7 @@ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentAborted() throw pkStorage().put(testBinaryRow, rowId); testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID); - txManager.updateTxMeta(txId, old -> new TxStateMeta(ABORTED, localNode.id(), commitPartitionId, null)); + txManager.updateTxMeta(txId, old -> new TxStateMeta(ABORTED, localNode.id(), commitPartitionId, null, null)); CompletableFuture fut = doReadOnlySingleGet(testBinaryKey); @@ -1618,7 +1624,7 @@ private void testWriteIntentOnPrimaryReplica( // Imitation of tx commit. txStateStorage.putForRebalance(txId, new TxMeta(COMMITTED, new ArrayList<>(), now)); - txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID(), commitPartitionId, now)); + txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID(), commitPartitionId, now, null)); CompletableFuture replicaCleanupFut = partitionReplicaListener.invoke( TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest() @@ -2876,7 +2882,7 @@ private static ReadOnlyDirectMultiRowReplicaRequest readOnlyDirectMultiRowReplic private void cleanup(UUID txId) { HybridTimestamp commitTs = clock.now(); - txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID(), commitPartitionId, commitTs)); + txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID(), commitPartitionId, commitTs, null)); WriteIntentSwitchReplicaRequest message = TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java index 49c007581e6..29921ef1bb1 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -2171,7 +2171,8 @@ public void testWriteIntentResolutionFallbackToCommitPartitionPath() { old.txState(), new UUID(1, 2), old.commitPartitionId(), - old.commitTimestamp() + old.commitTimestamp(), + old == null ? null : old.tx() )); } diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/KillTransactionTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/KillTransactionTest.java new file mode 100644 index 00000000000..eb61e2e11bb --- /dev/null +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/KillTransactionTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.InitParametersBuilder; +import org.apache.ignite.internal.ClusterPerClassIntegrationTest; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.testframework.IgniteTestUtils; +import org.apache.ignite.internal.wrapper.Wrappers; +import org.apache.ignite.table.KeyValueView; +import org.apache.ignite.tx.Transaction; +import org.apache.ignite.tx.TransactionException; +import org.apache.ignite.tx.TransactionOptions; +import org.junit.jupiter.api.Test; + +/** + * The test class contains the internal API to kill a transaction. + * The behavior of this API is similar to the rollback invocation, + * but a client has to get a specific exception when trying to interact with the transaction object. + */ +public class KillTransactionTest extends ClusterPerClassIntegrationTest { + @Override + protected void configureInitParameters(InitParametersBuilder builder) { + super.configureInitParameters(builder); + + builder.clusterConfiguration("ignite.transaction.txnResourceTtl=0"); + } + + @Test + public void testKillTransactionBeforeCommit() throws Exception { + killTransactionBeforeFinishing(true); + } + + @Test + public void testKillTransactionBeforeRollback() throws Exception { + killTransactionBeforeFinishing(false); + } + + /** + * The test creates a transaction on each node and then kills them. + * After this scenario, this test checks invariants: + * public API for the killed transaction throws an exception; + * transaction states are vacuumed. + * + * @param commit True for commit, false for rollback. + */ + private void killTransactionBeforeFinishing(boolean commit) throws Exception { + node(0).sql().executeScript("CREATE TABLE IF NOT EXISTS test (id INT PRIMARY KEY, val VARCHAR)"); + + ArrayList txs = new ArrayList<>(); + AtomicInteger keyGenerator = new AtomicInteger(ThreadLocalRandom.current().nextInt(100)); + + CLUSTER.runningNodes().forEach(node -> { + int key = keyGenerator.incrementAndGet(); + + Transaction tx = node.transactions().begin(new TransactionOptions() + .readOnly(key % 2 == 0)); + + KeyValueView kvView = node.tables().table("test").keyValueView(Integer.class, String.class); + + if (!tx.isReadOnly()) { + kvView.put(tx, key, "Test val"); + } + + String res = kvView.get(tx, key); + + if (tx.isReadOnly()) { + assertNull(res); + } else { + assertEquals("Test val", res); + } + + txs.add(tx); + }); + + CLUSTER.runningNodes().forEach(node -> { + IgniteImpl igniteImpl = Wrappers.unwrap(node, IgniteImpl.class); + + boolean txIsKilled = false; + + for (Transaction tx : txs) { + InternalTransaction internalTx = Wrappers.unwrap(tx, InternalTransaction.class); + + CompletableFuture killFut = igniteImpl.txManager().kill(internalTx.id()); + + assertThat(killFut, willCompleteSuccessfully()); + + if (killFut.join()) { + assertFalse(txIsKilled); + + txIsKilled = true; + } + } + + assertTrue(txIsKilled); + }); + + for (Transaction tx : txs) { + CompletableFuture finishFut = commit ? tx.commitAsync() : tx.rollbackAsync(); + + if (tx.isReadOnly()) { + assertThat(finishFut, willCompleteSuccessfully()); + } else { + assertThat(finishFut, willThrowWithCauseOrSuppressed( + TransactionException.class, + "Transaction is killed" + )); + } + } + + for (Transaction tx : txs) { + CompletableFuture finishFut = commit ? tx.commitAsync() : tx.rollbackAsync(); + + assertThat(finishFut, willCompleteSuccessfully()); + } + + waitForVacuum(txs); + } + + /** + * Waits for the removal of all transaction states from the volatile storage. + * + * @param txs List of transactions. + * @throws InterruptedException If the waiting would be interrupted. + */ + private void waitForVacuum(ArrayList txs) throws InterruptedException { + for (int i = 0; i < initialNodes(); i++) { + IgniteImpl igniteImpl = Wrappers.unwrap(node(i), IgniteImpl.class); + + assertTrue(IgniteTestUtils.waitForCondition(() -> { + assertThat(igniteImpl.txManager().vacuum(), willCompleteSuccessfully()); + + for (Transaction tx : txs) { + InternalTransaction internalTx = Wrappers.unwrap(tx, InternalTransaction.class); + + if (igniteImpl.txManager().stateMeta(internalTx.id()) != null) { + return false; + } + } + + return true; + }, 10_000)); + } + } +} diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java index 371b42739d3..6a1e561f45b 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java @@ -130,4 +130,11 @@ public interface InternalTransaction extends Transaction { * @return The transaction timeout. */ long timeout(); + + /** + * Kills this transaction. + * + * @return The future. + */ + CompletableFuture kill(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java index fd5afce206a..e4754f05dc1 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java @@ -224,6 +224,14 @@ CompletableFuture cleanup( */ CompletableFuture vacuum(); + /** + * Kills a local transaction by its id. The behavior is similar to the transaction rollback. + * + * @param txId Transaction id. + * @return Future will be completed with value true if the transaction was started locally and completed by this call. + */ + CompletableFuture kill(UUID txId); + /** * Returns a number of finished transactions. * diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java index 7956fce3c92..3a28aaff4f8 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java @@ -21,11 +21,11 @@ import static org.apache.ignite.internal.tx.TxState.ABANDONED; import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness; -import java.util.Objects; import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.tostring.IgniteToStringExclude; import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.tx.message.TxMessagesFactory; import org.apache.ignite.internal.tx.message.TxStateMetaMessage; @@ -50,6 +50,13 @@ public class TxStateMeta implements TransactionMeta { private final @Nullable Long cleanupCompletionTimestamp; + /** + * The ignite transaction object is associated with this state. This field can be initialized only on the transaction coordinator, + * {@code null} in other nodes. + */ + @IgniteToStringExclude + private final @Nullable InternalTransaction tx; + /** * Constructor. * @@ -57,14 +64,16 @@ public class TxStateMeta implements TransactionMeta { * @param txCoordinatorId Transaction coordinator id. * @param commitPartitionId Commit partition replication group id. * @param commitTimestamp Commit timestamp. + * @param tx Transaction object. This parameter is not {@code null} only for transaction coordinator. */ public TxStateMeta( TxState txState, @Nullable UUID txCoordinatorId, @Nullable TablePartitionId commitPartitionId, - @Nullable HybridTimestamp commitTimestamp + @Nullable HybridTimestamp commitTimestamp, + @Nullable InternalTransaction tx ) { - this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, null, null); + this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, tx, null); } /** @@ -74,6 +83,7 @@ public TxStateMeta( * @param txCoordinatorId Transaction coordinator id. * @param commitPartitionId Commit partition replication group id. * @param commitTimestamp Commit timestamp. + * @param tx Transaction object. This parameter is not {@code null} only for transaction coordinator. * @param initialVacuumObservationTimestamp Initial vacuum observation timestamp. */ public TxStateMeta( @@ -81,9 +91,10 @@ public TxStateMeta( @Nullable UUID txCoordinatorId, @Nullable TablePartitionId commitPartitionId, @Nullable HybridTimestamp commitTimestamp, + @Nullable InternalTransaction tx, @Nullable Long initialVacuumObservationTimestamp ) { - this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, initialVacuumObservationTimestamp, null); + this(txState, txCoordinatorId, commitPartitionId, commitTimestamp, tx, initialVacuumObservationTimestamp, null); } /** @@ -93,6 +104,7 @@ public TxStateMeta( * @param txCoordinatorId Transaction coordinator id. * @param commitPartitionId Commit partition replication group id. * @param commitTimestamp Commit timestamp. + * @param tx Transaction object. This parameter is not {@code null} only for transaction coordinator. * @param initialVacuumObservationTimestamp Initial vacuum observation timestamp. * @param cleanupCompletionTimestamp Cleanup completion timestamp. */ @@ -101,6 +113,7 @@ public TxStateMeta( @Nullable UUID txCoordinatorId, @Nullable TablePartitionId commitPartitionId, @Nullable HybridTimestamp commitTimestamp, + @Nullable InternalTransaction tx, @Nullable Long initialVacuumObservationTimestamp, @Nullable Long cleanupCompletionTimestamp ) { @@ -108,10 +121,20 @@ public TxStateMeta( this.txCoordinatorId = txCoordinatorId; this.commitPartitionId = commitPartitionId; this.commitTimestamp = commitTimestamp; + this.tx = tx; this.initialVacuumObservationTimestamp = initialVacuumObservationTimestamp; this.cleanupCompletionTimestamp = cleanupCompletionTimestamp; } + /** + * Gets a transaction object or {@code null} it the current node is not a coordinator for this transaction. + * + * @return Transaction object. + */ + public @Nullable InternalTransaction tx() { + return tx; + } + /** * Creates a transaction state for the same transaction, but this one is marked abandoned. * @@ -173,50 +196,6 @@ public TxStateMetaMessage toTransactionMetaMessage( .build(); } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TxStateMeta that = (TxStateMeta) o; - - if (txState != that.txState) { - return false; - } - - if (txCoordinatorId != null ? !txCoordinatorId.equals(that.txCoordinatorId) : that.txCoordinatorId != null) { - return false; - } - - if (commitPartitionId != null ? !commitPartitionId.equals(that.commitPartitionId) : that.commitPartitionId != null) { - return false; - } - - if (commitTimestamp != null ? !commitTimestamp.equals(that.commitTimestamp) : that.commitTimestamp != null) { - return false; - } - - if (initialVacuumObservationTimestamp != null - ? !initialVacuumObservationTimestamp.equals(that.initialVacuumObservationTimestamp) - : that.initialVacuumObservationTimestamp != null - ) { - return false; - } - - return cleanupCompletionTimestamp != null - ? cleanupCompletionTimestamp.equals(that.cleanupCompletionTimestamp) - : that.cleanupCompletionTimestamp == null; - } - - @Override - public int hashCode() { - return Objects.hash(txState, txCoordinatorId, commitPartitionId, commitTimestamp, initialVacuumObservationTimestamp, - cleanupCompletionTimestamp); - } - @Override public String toString() { return S.toString(TxStateMeta.class, this); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java index eaf85b0052b..55ca4ff8962 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java @@ -47,7 +47,7 @@ public TxStateMetaAbandoned( UUID txCoordinatorId, TablePartitionId commitPartitionId ) { - super(ABANDONED, txCoordinatorId, commitPartitionId, null); + super(ABANDONED, txCoordinatorId, commitPartitionId, null, null); this.lastAbandonedMarkerTs = FastTimestamps.coarseCurrentTimeMillis(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java index 4bd378f5a46..ca4246b52dd 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java @@ -45,7 +45,7 @@ public class TxStateMetaFinishing extends TxStateMeta { * @param commitPartitionId Commit partition id. */ public TxStateMetaFinishing(@Nullable UUID txCoordinatorId, @Nullable TablePartitionId commitPartitionId) { - super(TxState.FINISHING, txCoordinatorId, commitPartitionId, null); + super(TxState.FINISHING, txCoordinatorId, commitPartitionId, null, null); } /** diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java index d8bd384280d..73787429772 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PublicApiThreadingTransaction.java @@ -150,4 +150,9 @@ public long timeout() { public T unwrap(Class classToUnwrap) { return classToUnwrap.cast(transaction); } + + @Override + public CompletableFuture kill() { + return transaction.kill(); + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java index d6c3d99e066..24dd0210d09 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java @@ -144,4 +144,9 @@ public CompletableFuture finish(boolean commit, HybridTimestamp executionT public boolean isFinishingOrFinished() { return finishGuard.get(); } + + @Override + public CompletableFuture kill() { + return finish(false, readTimestamp, false); + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java index 016c9939893..8e5cbc510e4 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.tx.impl; +import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR; @@ -59,6 +60,8 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl { /** The future is initialized when this transaction starts committing or rolling back and is finished together with the transaction. */ private volatile CompletableFuture finishFuture; + private boolean killed; + /** * Constructs an explicit read-write transaction. * @@ -159,18 +162,63 @@ public CompletableFuture finish(boolean commit, @Nullable HybridTimestamp return finishFuture; } + return finishInternal(commit, executionTimestamp, full, true); + } + + /** + * Finishes the read-write transaction. + * + * @param commit Commit flag. + * @param executionTimestamp The timestamp is the time when the transaction is applied to the remote node. + * @param full Full state transaction marker. + * @param isComplete The flag is true if the transaction is completed through the public API, false for {@link this#kill()} invocation. + * @return The future. + */ + private CompletableFuture finishInternal( + boolean commit, + @Nullable HybridTimestamp executionTimestamp, + boolean full, + boolean isComplete + ) { enlistPartitionLock.writeLock().lock(); try { if (finishFuture == null) { + if (killed) { + if (isComplete) { + finishFuture = nullCompletedFuture(); + + return failedFuture(new TransactionException( + TX_ALREADY_FINISHED_ERR, + format("Transaction is killed [id={}, state={}].", id(), state()) + )); + } else { + return nullCompletedFuture(); + } + } + if (full) { txManager.finishFull(observableTsTracker, id(), executionTimestamp, commit); - finishFuture = nullCompletedFuture(); + if (isComplete) { + finishFuture = nullCompletedFuture(); + } else { + killed = true; + } } else { - CompletableFuture finishFutureInternal = finishInternal(commit); + CompletableFuture finishFutureInternal = txManager.finish( + observableTsTracker, + commitPart, + commit, + enlisted, + id() + ); - finishFuture = finishFutureInternal.handle((unused, throwable) -> null); + if (isComplete) { + finishFuture = finishFutureInternal.handle((unused, throwable) -> null); + } else { + killed = true; + } // Return the real future first time. return finishFutureInternal; @@ -188,16 +236,6 @@ public boolean isFinishingOrFinished() { return finishFuture != null; } - /** - * Internal method for finishing this transaction. - * - * @param commit {@code true} to commit, false to rollback. - * @return The future of transaction completion. - */ - private CompletableFuture finishInternal(boolean commit) { - return txManager.finish(observableTsTracker, commitPart, commit, enlisted, id()); - } - /** {@inheritDoc} */ @Override public boolean isReadOnly() { @@ -214,4 +252,9 @@ public HybridTimestamp readTimestamp() { public HybridTimestamp startTimestamp() { return TransactionIds.beginTimestamp(id()); } + + @Override + public CompletableFuture kill() { + return finishInternal(false, null, false, false); + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java index 6334a5b6910..b2e268a874e 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java @@ -114,12 +114,15 @@ private void markTxnCleanupReplicated(UUID txId, TxState state, TablePartitionId long cleanupCompletionTimestamp = System.currentTimeMillis(); txStateVolatileStorage.updateMeta(txId, oldMeta -> - new TxStateMeta(oldMeta == null ? state : oldMeta.txState(), + new TxStateMeta( + oldMeta == null ? state : oldMeta.txState(), oldMeta == null ? null : oldMeta.txCoordinatorId(), commitPartitionId, oldMeta == null ? null : oldMeta.commitTimestamp(), + oldMeta == null ? null : oldMeta.tx(), oldMeta == null ? null : oldMeta.initialVacuumObservationTimestamp(), - cleanupCompletionTimestamp) + cleanupCompletionTimestamp + ) ); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index 964726432ac..05fb1f07a09 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -408,22 +408,31 @@ private InternalTransaction beginBusy( boolean readOnly, InternalTxOptions options ) { - HybridTimestamp beginTimestamp = readOnly ? clockService.now() : createBeginTimestampWithIncrementRwTxCounter(); - UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, options.priority()); - startedTxs.add(1); - if (!readOnly) { - txStateVolatileStorage.initialize(txId, localNodeId); + InternalTransaction tx; + + if (readOnly) { + HybridTimestamp beginTimestamp = clockService.now(); + + UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, options.priority()); + + tx = beginReadOnlyTransaction(timestampTracker, beginTimestamp, txId, implicit, options); + } else { + HybridTimestamp beginTimestamp = createBeginTimestampWithIncrementRwTxCounter(); + + UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, options.priority()); // TODO: RW timeouts will be supported in https://issues.apache.org/jira/browse/IGNITE-24244 // long timeout = options.timeoutMillis() == 0 ? txConfig.readWriteTimeout().value() : options.timeoutMillis(); long timeout = 3_000; - return new ReadWriteTransactionImpl(this, timestampTracker, txId, localNodeId, implicit, timeout); - } else { - return beginReadOnlyTransaction(timestampTracker, beginTimestamp, txId, implicit, options); + tx = new ReadWriteTransactionImpl(this, timestampTracker, txId, localNodeId, implicit, timeout); } + + txStateVolatileStorage.initialize(tx); + + return tx; } private ReadOnlyTransactionImpl beginReadOnlyTransaction( @@ -553,7 +562,8 @@ public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, @Null finalState, old == null ? null : old.txCoordinatorId(), old == null ? null : old.commitPartitionId(), - ts + ts, + old == null ? null : old.tx() )); decrementRwTxCount(txId); @@ -580,7 +590,11 @@ public CompletableFuture finish( if (enlistedGroups.isEmpty()) { // If there are no enlisted groups, just update local state - we already marked the tx as finished. updateTxMeta(txId, old -> new TxStateMeta( - commitIntent ? COMMITTED : ABORTED, localNodeId, commitPartition, commitTimestamp(commitIntent) + commitIntent ? COMMITTED : ABORTED, + localNodeId, + commitPartition, + commitTimestamp(commitIntent), + old == null ? null : old.tx() )); decrementRwTxCount(txId); @@ -727,6 +741,7 @@ private CompletableFuture durableFinish( old == null ? null : old.txCoordinatorId(), commitPartition, result.commitTimestamp(), + old == null ? null : old.tx(), old == null ? null : old.initialVacuumObservationTimestamp(), old == null ? null : old.cleanupCompletionTimestamp() ) @@ -793,6 +808,7 @@ private CompletableFuture makeFinishRequest( localNodeId, old == null ? null : old.commitPartitionId(), txResult.commitTimestamp(), + old == null ? null : old.tx(), old == null ? null : old.initialVacuumObservationTimestamp(), old == null ? null : old.cleanupCompletionTimestamp() )); @@ -960,6 +976,22 @@ public CompletableFuture vacuum() { persistentTxStateVacuumizer::vacuumPersistentTxStates); } + @Override + public CompletableFuture kill(UUID txId) { + TxStateMeta state = txStateVolatileStorage.state(txId); + + if (state != null && state.tx() != null) { + // TODO: IGNITE-24382 Kill implicit read-write transaction. + if (!state.tx().isReadOnly() && state.tx().implicit()) { + return falseCompletedFuture(); + } + + return state.tx().kill().thenApply(unused -> true); + } + + return falseCompletedFuture(); + } + @Override public CompletableFuture executeWriteIntentSwitchAsync(Runnable runnable) { return runAsync(runnable, writeIntentSwitchPool); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java index b4629d9c0cf..f5f2fb78818 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.TxStateMeta; import org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.PersistentTxStateVacuumResult; @@ -68,13 +69,12 @@ public void stop() { /** * Initializes the meta state for a created transaction. * - * @param txId Transaction id. - * @param txCrdId Transaction coordinator id. + * @param tx Transaction object. */ - public void initialize(UUID txId, UUID txCrdId) { - TxStateMeta previous = txStateMap.put(txId, new TxStateMeta(PENDING, txCrdId, null, null)); + public void initialize(InternalTransaction tx) { + TxStateMeta previous = txStateMap.put(tx.id(), new TxStateMeta(PENDING, tx.coordinatorId(), null, null, tx)); - assert previous == null : "Transaction state has already defined [txId=" + txCrdId + ", state=" + previous.txState() + ']'; + assert previous == null : "Transaction state has already defined [txId=" + tx.id() + ", state=" + previous.txState() + ']'; } /** @@ -158,7 +158,13 @@ public CompletableFuture vacuum( txStateMap.forEach((txId, meta) -> { txStateMap.computeIfPresent(txId, (txId0, meta0) -> { - if (TxState.isFinalState(meta0.txState())) { + if (meta0.tx() != null && meta0.tx().isReadOnly()) { + if (meta0.tx().isFinishingOrFinished()) { + vacuumizedTxnsCount.incrementAndGet(); + + return null; + } + } else if (TxState.isFinalState(meta0.txState())) { Long initialVacuumObservationTimestamp = meta0.initialVacuumObservationTimestamp(); if (initialVacuumObservationTimestamp == null && txnResourceTtl > 0) { @@ -192,10 +198,10 @@ public CompletableFuture vacuum( return meta0; } } - } else { - skippedForFurtherProcessingUnfinishedTxnsCount.incrementAndGet(); - return meta0; } + + skippedForFurtherProcessingUnfinishedTxnsCount.incrementAndGet(); + return meta0; }); }); @@ -248,6 +254,7 @@ private static TxStateMeta markInitialVacuumObservationTimestamp(TxStateMeta met meta.txCoordinatorId(), meta.commitPartitionId(), meta.commitTimestamp(), + meta.tx(), vacuumObservationTimestamp, meta.cleanupCompletionTimestamp() ); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaMessage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaMessage.java index 40103884ae6..d08e0a1a5af 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaMessage.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaMessage.java @@ -48,6 +48,7 @@ default TxStateMeta asTxStateMeta() { txCoordinatorId(), commitPartitionId == null ? null : commitPartitionId.asTablePartitionId(), commitTimestamp(), + null, initialVacuumObservationTimestamp(), cleanupCompletionTimestamp() ); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java index f5b34d7004f..b051035e4d7 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/views/TransactionsViewProvider.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.tx.views; +import static org.apache.ignite.internal.tx.TxState.PENDING; import static org.apache.ignite.internal.tx.TxState.isFinalState; import static org.apache.ignite.internal.type.NativeTypes.stringOf; @@ -108,6 +109,7 @@ public Iterator iterator() { new TransformingIterator<>(roTxIds.iterator(), TxInfo::readOnly), rwTxStates.entrySet().stream() .filter(e -> localNodeId.equals(e.getValue().txCoordinatorId()) + && e.getValue().tx() != null && !e.getValue().tx().isReadOnly() && !isFinalState(e.getValue().txState())) .map(e -> TxInfo.readWrite(e.getKey(), e.getValue().txState())) .iterator() @@ -123,7 +125,7 @@ static class TxInfo { private final String priority; static TxInfo readOnly(UUID id) { - return new TxInfo(id, null, true); + return new TxInfo(id, PENDING, true); } static TxInfo readWrite(UUID id, TxState txState) { diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java index d106715ee1b..3df5bac5af2 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java @@ -184,7 +184,7 @@ void testNoTriggerCommittedState() { UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); - TxStateMeta committedState = new TxStateMeta(TxState.COMMITTED, LOCAL_NODE.id(), tpId, clock.now()); + TxStateMeta committedState = new TxStateMeta(TxState.COMMITTED, LOCAL_NODE.id(), tpId, clock.now(), null); txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> committedState); @@ -216,7 +216,7 @@ void testNoTriggerAbortedState() { UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); - TxStateMeta abortedState = new TxStateMeta(TxState.ABORTED, LOCAL_NODE.id(), tpId, null); + TxStateMeta abortedState = new TxStateMeta(TxState.ABORTED, LOCAL_NODE.id(), tpId, null, null); txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> abortedState); @@ -245,7 +245,7 @@ void testNoTriggerFinishingState() { UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); - TxStateMeta finishingState = new TxStateMeta(TxState.FINISHING, LOCAL_NODE.id(), tpId, null); + TxStateMeta finishingState = new TxStateMeta(TxState.FINISHING, LOCAL_NODE.id(), tpId, null, null); txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> finishingState); @@ -277,7 +277,7 @@ void testNoTriggerCoordinatorAlive() { UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); - TxStateMeta pendingState = new TxStateMeta(TxState.PENDING, LOCAL_NODE.id(), tpId, null); + TxStateMeta pendingState = new TxStateMeta(TxState.PENDING, LOCAL_NODE.id(), tpId, null, null); txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> pendingState); @@ -311,7 +311,7 @@ void testTriggerOnLockConflictCoordinatorDead() { UUID concurrentTxId = idGenerator.transactionIdFor(clock.now()); - TxStateMeta pendingState = new TxStateMeta(TxState.PENDING, LOCAL_NODE.id(), tpId, null); + TxStateMeta pendingState = new TxStateMeta(TxState.PENDING, LOCAL_NODE.id(), tpId, null, null); txStateMetaStorage.updateMeta(orphanTxId, stateMeta -> pendingState); diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java index 44721211063..57f836f9d62 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java @@ -104,7 +104,7 @@ private void startTxAndTryToEnlist(boolean commit) { Mockito.when(txManager.stateMeta(any())).thenAnswer(invocation -> { if (finishedTxs.contains(invocation.getArgument(0))) { - return new TxStateMeta(txState, randomUUID(), TX_COMMIT_PART, null); + return new TxStateMeta(txState, randomUUID(), TX_COMMIT_PART, null, null); } return null;