Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: modify XA mode pre commit transaction from commit phase to before close phase #7102

Open
wants to merge 17 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,26 +176,36 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
commit();
}
} else {
if (xaActive) {
throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active");
}
// Start a XA branch
long branchId;
try {
// 1. register branch to TC then get the branch message
branchRegisterTime = System.currentTimeMillis();
branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null,
null);
} catch (TransactionException te) {
cleanXABranchContext();
throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);
int flags;
if (this.xaBranchXid != null && currentAutoCommitStatus) {
flags = XAResource.TMJOIN;
} else {
if (xaActive) {
throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active");
}
if (JdbcConstants.ORACLE.equals(resource.getDbType())) {
flags = SeataXAResource.ORATRANSLOOSE;
} else {
flags = XAResource.TMNOFLAGS;
}
// Start a XA branch
long branchId;
try {
// 1. register branch to TC then get the branch message
branchRegisterTime = System.currentTimeMillis();
branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null,
null);
} catch (TransactionException te) {
cleanXABranchContext();
throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);
}
// 2. build XA-Xid with xid and branchId
this.xaBranchXid = XAXidBuilder.build(xid, branchId);
}
// 2. build XA-Xid with xid and branchId
this.xaBranchXid = XAXidBuilder.build(xid, branchId);
// Keep the Connection if necessary
keepIfNecessary();
try {
start();
start(flags);
} catch (XAException e) {
cleanXABranchContext();
throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e);
Expand Down Expand Up @@ -223,34 +233,12 @@ public synchronized void commit() throws SQLException {
throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END);
}
try {
// XA End: Success
try {
end(XAResource.TMSUCCESS);
} catch (SQLException sqle) {
// Rollback immediately before the XA Branch Context is deleted.
String xaBranchXid = this.xaBranchXid.toString();
rollback();
throw new SQLException("Branch " + xaBranchXid + " was rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, sqle);
}
long now = System.currentTimeMillis();
checkTimeout(now);
setPrepareTime(now);
int prepare = xaResource.prepare(xaBranchXid);
// Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022),
// only Oracle has read-only optimization; the others do not provide read-only feedback.
// Therefore, the database type check can be eliminated here.
if (prepare == XAResource.XA_RDONLY) {
// Branch Report to TC: RDONLY
reportStatusToTC(BranchStatus.PhaseOne_RDONLY);
}
} catch (XAException xe) {
// Branch Report to TC: Failed
reportStatusToTC(BranchStatus.PhaseOne_Failed);
throw new SQLException(
"Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
.getMessage(), xe);
xaEnd(xaBranchXid, XAResource.TMSUCCESS);
xiaoxiangyeyu0 marked this conversation as resolved.
Show resolved Hide resolved
} catch (XAException e) {
throw new SQLException("Failed to end(TMSUCCESS) xa branch on " + xid + "-" + xaBranchXid.getBranchId()
+ " since " + e.getMessage(), e);
} finally {
cleanXABranchContext();
xaActive = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个改成false后,如果通过setAutoCommit触发commit时要怎么办?
After changing this to false, what if the commit is triggered by setAutoCommit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里置为false是因为,自动提交事务第二次调用setAutoCommit时,if (xaActive)这里会调用commit方法,currentAutoCommitStatus在第一次被置为false,进入会继续执行end方法就会报错。
现在commit方法的xaActive = false;逻辑已去掉,setAutoCommit新加了一层判断 if (xaActive && !xaEnded)

}
}

Expand Down Expand Up @@ -280,13 +268,9 @@ public void rollback() throws SQLException {
}
}

private synchronized void start() throws XAException, SQLException {
private synchronized void start(int flags) throws XAException, SQLException {
// 3. XA Start
if (JdbcConstants.ORACLE.equals(resource.getDbType())) {
xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE);
} else {
xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
}
xaResource.start(this.xaBranchXid, flags);

try {
termination();
Expand Down Expand Up @@ -324,13 +308,41 @@ private void checkTimeout(Long now) throws XAException {

@Override
public synchronized void close() throws SQLException {
rollBacked = false;
if (isHeld() && shouldBeHeld()) {
// if kept by a keeper, just hold the connection.
return;
try {
if (xaEnded) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end后的xa事务应该是可以join的,只要没有进入prepare阶段,如果我手动commit或rollback,再接着发sql,这里被标识成了xaended,如何进行prepare?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

接着发sql最终还是会调用commit方法,xaEnded会被置为true,close时依然会进行prepare

termination();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我认为prepare这块的逻辑不需要termination
I don't think the logic of preparing this piece needs termination

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已调整commit从调用xaEnd改为end方法,这里已去掉

long now = System.currentTimeMillis();
checkTimeout(now);
setPrepareTime(now);
int prepare = xaResource.prepare(xaBranchXid);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果走了rollback的事务不需要prepare,直接上报一阶段失败即可
If the transaction that has gone rollback does not need to be prepared, it can directly report the failure of the first stage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rollback方法的finally会调用cleanXABranchContext,这里会把xaEnded置为false

// Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022),
// only Oracle has read-only optimization; the others do not provide read-only feedback.
// Therefore, the database type check can be eliminated here.
if (prepare == XAResource.XA_RDONLY) {
// Branch Report to TC: RDONLY
reportStatusToTC(BranchStatus.PhaseOne_RDONLY);
}
}
} catch (SQLException sqle) {
// Rollback immediately before the XA Branch Context is deleted.
String xaBranchXid = this.xaBranchXid.toString();
rollback();
throw new SQLException("Branch " + xaBranchXid + " was rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, sqle);
} catch (XAException xe) {
// Branch Report to TC: Failed
reportStatusToTC(BranchStatus.PhaseOne_Failed);
throw new SQLException(
"Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
.getMessage(), xe);
} finally {
rollBacked = false;
if (isHeld() && shouldBeHeld()) {
// if kept by a keeper, just hold the connection.
} else {
cleanXABranchContext();
originalConnection.close();
}
}
cleanXABranchContext();
originalConnection.close();
}

protected synchronized void closeForce() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testXABranchCommit() throws Throwable {
connectionProxyXA.commit();

Mockito.verify(xaResource).end(any(Xid.class), any(Integer.class));
Mockito.verify(xaResource).prepare(any(Xid.class));
Mockito.verify(xaResource, times(0)).prepare(any(Xid.class));
}

@Test
Expand Down
Loading