Skip to content

Commit

Permalink
IGNITE-23680 Add the ability to rollback any transaction by ID (#5146)
Browse files Browse the repository at this point in the history
  • Loading branch information
vldpyatkov authored Feb 4, 2025
1 parent 220c5f7 commit 56f674a
Show file tree
Hide file tree
Showing 25 changed files with 407 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ public boolean isFinishingOrFinished() {
public long timeout() {
return 10_000;
}

@Override
public CompletableFuture<Void> kill() {
return nullCompletedFuture();
}
};
}

Expand Down Expand Up @@ -235,6 +240,11 @@ public CompletableFuture<Void> vacuum() {
return nullCompletedFuture();
}

@Override
public CompletableFuture<Boolean> kill(UUID txId) {
return nullCompletedFuture();
}

@Override
public int finished() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
Expand Down Expand Up @@ -76,7 +75,7 @@ void shouldReturnAllTransactions() {

assertThat(transactionInfo, notNullValue());
assertThat(transactionInfo.type(), is("READ_ONLY"));
assertThat(transactionInfo.state(), nullValue());
assertThat(transactionInfo.state(), is("PENDING"));
assertThat(transactionInfo.priority(), is("NORMAL"));

roTx.rollback();
Expand All @@ -103,7 +102,7 @@ void shouldReturnTransactionById() {
{
assertThat(roTransactionInfo, notNullValue());
assertThat(roTransactionInfo.type(), is("READ_ONLY"));
assertThat(roTransactionInfo.state(), nullValue());
assertThat(roTransactionInfo.state(), is("PENDING"));
assertThat(roTransactionInfo.priority(), is("NORMAL"));

roTx.rollback();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
Expand Down Expand Up @@ -108,7 +109,13 @@ public void testImplicit() {
ignite.tables().table(TABLE_NAME).keyValueView().get(null, Tuple.create().set("id", 12));
ignite.tables().table(TABLE_NAME).keyValueView().getAll(null, Set.of(Tuple.create().set("id", 12)));

int txRwStatesAfter = txManager.states().size();
int txRwStatesAfter = 0;

for (TxStateMeta stateMeta : txManager.states()) {
if (stateMeta.tx() == null || !stateMeta.tx().isReadOnly()) {
txRwStatesAfter++;
}
}

int txFinishedAfter = txManager.finished();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId tablePartitionId
return nodeAndConsistencyToken;
}

@Override
public CompletableFuture<Void> kill() {
return rollbackAsync();
}

/** Returns a {@link CompletableFuture} that completes when this transaction commits. */
public CompletableFuture<Void> commitFuture() {
return commitFut;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ private void testTransaction(Consumer<Transaction> touchOp, boolean checkAfterTo

ReadWriteTransactionImpl tx = (ReadWriteTransactionImpl) testCluster.igniteTransactions().begin();

checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null), List.of(0));
checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null, null), List.of(0));
checkLocalTxStateOnNodes(tx.id(), null, IntStream.range(1, NODES).boxed().collect(toList()));

touchOp.accept(tx);

if (checkAfterTouch) {
checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null));
checkLocalTxStateOnNodes(tx.id(), new TxStateMeta(PENDING, coordinatorId, tx.commitPartition(), null, null));
}

if (commit) {
Expand All @@ -166,7 +166,8 @@ private void testTransaction(Consumer<Transaction> touchOp, boolean checkAfterTo
commit ? COMMITTED : ABORTED,
coordinatorId,
tx.commitPartition(),
commit ? testCluster.clockServices.get(coord.name()).now() : null
commit ? testCluster.clockServices.get(coord.name()).now() : null,
null
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,8 @@ private void replicaTouch(UUID txId, UUID txCoordinatorId, HybridTimestamp commi
full ? COMMITTED : PENDING,
txCoordinatorId,
old == null ? null : old.commitPartitionId(),
full ? commitTimestamp : null
full ? commitTimestamp : null,
old == null ? null : old.tx()
));
}

Expand All @@ -772,6 +773,7 @@ private void markFinished(UUID txId, boolean commit, @Nullable HybridTimestamp c
old == null ? null : old.txCoordinatorId(),
old == null ? partId : old.commitPartitionId(),
commit ? commitTimestamp : null,
old == null ? null : old.tx(),
old == null ? null : old.initialVacuumObservationTimestamp(),
old == null ? null : old.cleanupCompletionTimestamp()
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,8 @@ private CompletableFuture<?> processRequest(ReplicaRequest request, @Nullable Bo
PENDING,
req.coordinatorId(),
req.commitPartitionId().asTablePartitionId(),
null
null,
old == null ? null : old.tx()
));
}
}
Expand Down Expand Up @@ -811,7 +812,8 @@ private CompletableFuture<?> processOperationRequest(
PENDING,
req.coordinatorId(),
req.commitPartitionId().asTablePartitionId(),
null
null,
old == null ? null : old.tx()
));

var opId = new OperationId(senderId, req.timestamp().longValue());
Expand Down Expand Up @@ -3945,6 +3947,7 @@ private void markFinished(UUID txId, TxState txState, @Nullable HybridTimestamp
old == null ? null : old.txCoordinatorId(),
old == null ? null : old.commitPartitionId(),
txState == COMMITTED ? commitTimestamp : null,
old == null ? null : old.tx(),
old == null ? null : old.initialVacuumObservationTimestamp(),
old == null ? null : old.cleanupCompletionTimestamp()
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,13 @@ public void testTxStateReplicaRequestEmptyState() throws Exception {
doAnswer(invocation -> {
UUID txId = invocation.getArgument(4);

txManager.updateTxMeta(txId, old -> new TxStateMeta(ABORTED, localNode.id(), commitPartitionId, null));
txManager.updateTxMeta(txId, old -> new TxStateMeta(
ABORTED,
localNode.id(),
commitPartitionId,
null,
null
));

return nullCompletedFuture();
}).when(txManager).finish(any(), any(), anyBoolean(), any(), any());
Expand Down Expand Up @@ -776,7 +782,7 @@ void testExecuteRequestOnFinishedTx(TxState txState, RequestType requestType) {
UUID txId = newTxId();

txStateStorage.putForRebalance(txId, new TxMeta(txState, singletonList(grpId), null));
txManager.updateTxMeta(txId, old -> new TxStateMeta(txState, null, null, null));
txManager.updateTxMeta(txId, old -> new TxStateMeta(txState, null, null, null, null));

BinaryRow testRow = binaryRow(0);

Expand Down Expand Up @@ -911,7 +917,7 @@ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentCommitted() thr

pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, localNode.id(), commitPartitionId, clock.now()));
txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, localNode.id(), commitPartitionId, clock.now(), null));

CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey);

Expand All @@ -929,7 +935,7 @@ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentPending() throw

pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, localNode.id(), commitPartitionId, null));
txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, localNode.id(), commitPartitionId, null, null));

CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey);

Expand All @@ -948,7 +954,7 @@ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentAborted() throw

pkStorage().put(testBinaryRow, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(ABORTED, localNode.id(), commitPartitionId, null));
txManager.updateTxMeta(txId, old -> new TxStateMeta(ABORTED, localNode.id(), commitPartitionId, null, null));

CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey);

Expand Down Expand Up @@ -1618,7 +1624,7 @@ private void testWriteIntentOnPrimaryReplica(

// Imitation of tx commit.
txStateStorage.putForRebalance(txId, new TxMeta(COMMITTED, new ArrayList<>(), now));
txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID(), commitPartitionId, now));
txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID(), commitPartitionId, now, null));

CompletableFuture<?> replicaCleanupFut = partitionReplicaListener.invoke(
TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest()
Expand Down Expand Up @@ -2876,7 +2882,7 @@ private static ReadOnlyDirectMultiRowReplicaRequest readOnlyDirectMultiRowReplic
private void cleanup(UUID txId) {
HybridTimestamp commitTs = clock.now();

txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID(), commitPartitionId, commitTs));
txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED, UUID.randomUUID(), commitPartitionId, commitTs, null));

WriteIntentSwitchReplicaRequest message = TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2171,7 +2171,8 @@ public void testWriteIntentResolutionFallbackToCommitPartitionPath() {
old.txState(),
new UUID(1, 2),
old.commitPartitionId(),
old.commitTimestamp()
old.commitTimestamp(),
old == null ? null : old.tx()
));
}

Expand Down
Loading

0 comments on commit 56f674a

Please sign in to comment.