Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix transaction panics in endTransaction with "transaction not found" #529

Merged
merged 10 commits into from
Jan 3, 2024
4 changes: 2 additions & 2 deletions transaction-ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

[ballerina]
dependencies-toml-version = "2"
distribution-version = "2201.8.0-20230726-145300-b2bdf796"
distribution-version = "2201.8.0-20230908-135700-74a59dff"

[[package]]
org = "ballerina"
Expand Down Expand Up @@ -64,7 +64,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "http"
version = "2.10.0"
version = "2.10.3"
dependencies = [
{org = "ballerina", name = "auth"},
{org = "ballerina", name = "cache"},
Expand Down
151 changes: 90 additions & 61 deletions transaction-ballerina/commons.bal
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/cache;
import ballerina/http;
import ballerina/lang.'transaction as lang_trx;
import ballerina/lang.'value as value;
import ballerina/log;
import ballerina/uuid;
import ballerina/task;
import ballerina/time;
import ballerina/lang.'transaction as lang_trx;
import ballerina/lang.'value as value;
import ballerina/uuid;

# ID of the local participant used when registering with the initiator.
string localParticipantId = uuid:createType4AsString();
Expand All @@ -30,7 +29,8 @@
map<TwoPhaseCommitTransaction> initiatedTransactions = {};

# This map is used for caching transaction that are this Ballerina instance participates in.
@tainted map<TwoPhaseCommitTransaction> participatedTransactions = {};
@tainted
map<TwoPhaseCommitTransaction> participatedTransactions = {};

# This cache is used for caching HTTP connectors against the URL, since creating connectors is expensive.
cache:Cache httpClientCache = new;
Expand All @@ -55,8 +55,8 @@
while (i < participatedTransactionsArr.length()) {
var twopcTxn = participatedTransactionsArr[i];
i += 1;
//TODO: commenting due to a caching issue
//foreach var twopcTxn in participatedTransactions {
//TODO: commenting due to a caching issue
//foreach var twopcTxn in participatedTransactions {
final string participatedTxnId = getParticipatedTransactionId(twopcTxn.transactionId,
twopcTxn.transactionBlockId);
if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= 120d) {
Expand Down Expand Up @@ -85,20 +85,23 @@
}
}
}
if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= <decimal> 600) {
if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= <decimal>600) {
// We don't want dead transactions hanging around
removeParticipatedTransaction(participatedTxnId);
}
}
}
worker w2 returns () {
TwoPhaseCommitTransaction[] initiatedTransactionsArr = initiatedTransactions.toArray();
TwoPhaseCommitTransaction[] initiatedTransactionsArr;
lock {
initiatedTransactionsArr = initiatedTransactions.toArray();
}
int i = 0;
while(i < initiatedTransactionsArr.length()) {
while (i < initiatedTransactionsArr.length()) {
var twopcTxn = initiatedTransactionsArr[i];
i += 1;
//TODO:commenting due to a caching issue
//foreach var twopcTxn in initiatedTransactions {
//TODO:commenting due to a caching issue
//foreach var twopcTxn in initiatedTransactions {
if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= <decimal>120) {
if (twopcTxn.state != TXN_STATE_ABORTED) {
// Commit the transaction since prepare hasn't been received
Expand All @@ -114,7 +117,7 @@
}
}
}
if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= <decimal> 600) {
if (time:utcDiffSeconds(time:utcNow(), twopcTxn.createdTime) >= <decimal>600) {
// We don't want dead transactions hanging around
removeInitiatedTransaction(twopcTxn.transactionId);
}
Expand All @@ -125,7 +128,6 @@
return value;
}


function isRegisteredParticipant(string participantId, map<Participant> participants) returns boolean {
return participants.hasKey(participantId);
}
Expand All @@ -135,8 +137,8 @@
while (i < coordinationTypes.length()) {
var coordType = coordinationTypes[i];
i += 1;
//TODO:commenting due to caching issue;
//foreach var coordType in coordinationTypes {
//TODO:commenting due to caching issue;
//foreach var coordType in coordinationTypes {
if (coordinationType == coordType) {
return true;
}
Expand All @@ -148,27 +150,27 @@
if (p is RemoteProtocol) {
return p.name;
} else {
return <string> p.name;
return <string>p.name;
}
}

function protocolCompatible(string coordinationType, UProtocol?[] participantProtocols) returns boolean {
boolean participantProtocolIsValid = false;
string[] validProtocols = coordinationTypeToProtocolsMap[coordinationType] ?: [];
int i = 0;
while ( i < participantProtocols.length()) {
while (i < participantProtocols.length()) {
var p = participantProtocols[i];
i += 1;
//TODO: commenting due to a caching issue
//foreach var p in participantProtocols {
//TODO: commenting due to a caching issue
//foreach var p in participantProtocols {
if (p is UProtocol) {
UProtocol participantProtocol = p;
int j = 0;
while (j < validProtocols.length()) {
var validProtocol = validProtocols[j];
j += 1;
//TODO: commenting due to a caching issue
//foreach var validProtocol in validProtocols {
//TODO: commenting due to a caching issue
//foreach var validProtocol in validProtocols {
if (protoName(participantProtocol) == validProtocol) {
participantProtocolIsValid = true;
break;
Expand All @@ -188,11 +190,12 @@

function respondToBadRequest(http:Caller ep, string msg) {
log:printError(msg);
http:Response res = new; res.statusCode = http:STATUS_BAD_REQUEST;
RequestError requestError = {errorMessage:msg};
http:Response res = new;
res.statusCode = http:STATUS_BAD_REQUEST;
RequestError requestError = {errorMessage: msg};

Check warning on line 195 in transaction-ballerina/commons.bal

View check run for this annotation

Codecov / codecov/patch

transaction-ballerina/commons.bal#L194-L195

Added lines #L194 - L195 were not covered by tests
var resPayload = requestError.cloneWithType(JsonTypedesc);
if (resPayload is json) {
res.setJsonPayload(<@untainted json> resPayload);
res.setJsonPayload(<@untainted json>resPayload);
var resResult = ep->respond(res);
if (resResult is error) {
log:printError("Could not send Bad Request error response to caller", 'error = resResult);
Expand Down Expand Up @@ -220,7 +223,7 @@
# corresponding to the coordinationType will also be created and stored as an initiated transaction.
#
# + coordinationType - The type of the coordination relevant to the transaction block for which this TransactionContext
# is being created for.
# is being created for.
# + transactionBlockId - The ID of the transaction block.
# + return - TransactionContext if the coordination type is valid or an error in case of an invalid coordination type.
function createTransactionContext(string coordinationType, string transactionBlockId) returns TransactionContext|error {
Expand All @@ -229,15 +232,17 @@
error err = error(msg);
return err;
} else {
TwoPhaseCommitTransaction txn = new(uuid(), transactionBlockId, coordinationType = coordinationType);
TwoPhaseCommitTransaction txn = new (uuid(), transactionBlockId, coordinationType = coordinationType);
string txnId = txn.transactionId;
txn.isInitiated = true;
initiatedTransactions[txnId] = txn;
lock {
initiatedTransactions[txnId] = txn;
}
TransactionContext txnContext = {
transactionId:txnId,
transactionBlockId:transactionBlockId,
coordinationType:coordinationType,
registerAtURL:"http://" + value:toString(coordinatorHost) + ":" + value:toString(coordinatorPort) +
transactionId: txnId,
transactionBlockId: transactionBlockId,
coordinationType: coordinationType,
registerAtURL: "http://" + value:toString(coordinatorHost) + ":" + value:toString(coordinatorPort) +
initiatorCoordinatorBasePath + "/" + transactionBlockId + registrationPath
};
return txnContext;
Expand All @@ -254,38 +259,46 @@
# + return - TransactionContext if the registration is successul or an error in case of a failure.
function registerLocalParticipantWithInitiator(string transactionId, string transactionBlockId, string registerAtURL)
returns TransactionContext|error {

final string trxId = transactionId;
final string participantId = getParticipantId(transactionBlockId);
//TODO: Protocol name should be passed down from the transaction statement
LocalProtocol participantProtocol = {name:PROTOCOL_DURABLE};
var initiatedTxn = initiatedTransactions[transactionId];
LocalProtocol participantProtocol = {name: PROTOCOL_DURABLE};
TwoPhaseCommitTransaction? initiatedTxn;
lock {
initiatedTxn = initiatedTransactions[transactionId];
}
if (initiatedTxn is ()) {
return error lang_trx:Error("Transaction-Unknown. Invalid TID:" + transactionId);
} else {
if (isRegisteredParticipant(participantId, initiatedTxn.participants)) { // Already-Registered
log:printDebug("Already-Registered. TID:" + trxId + ", participant ID:" + participantId);
TransactionContext txnCtx = {
transactionId:transactionId, transactionBlockId:transactionBlockId,
coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL
transactionId: transactionId,
transactionBlockId: transactionBlockId,
coordinationType: TWO_PHASE_COMMIT,
registerAtURL: registerAtURL
};
return txnCtx;
} else if (!protocolCompatible(initiatedTxn.coordinationType, [participantProtocol])) { // Invalid-Protocol
return error lang_trx:Error("Invalid-Protocol in local participant. TID:" + transactionId + ",participantID:" +
participantId);
} else {
//Set initiator protocols
TwoPhaseCommitTransaction participatedTxn = new(transactionId, transactionBlockId);
TwoPhaseCommitTransaction participatedTxn = new (transactionId, transactionBlockId);
//Protocol initiatorProto = {name: PROTOCOL_DURABLE, transactionBlockId:transactionBlockId};
//participatedTxn.coordinatorProtocols = [initiatorProto];

LocalParticipant participant = new(participantId, participatedTxn, [participantProtocol]);
LocalParticipant participant = new (participantId, participatedTxn, [participantProtocol]);
initiatedTxn.participants[participantId] = participant;

string participatedTxnId = getParticipatedTransactionId(transactionId, transactionBlockId);
participatedTransactions[participatedTxnId] = participatedTxn;
TransactionContext txnCtx = {transactionId:transactionId, transactionBlockId:transactionBlockId,
coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL};
TransactionContext txnCtx = {
transactionId: transactionId,
transactionBlockId: transactionBlockId,
coordinationType: TWO_PHASE_COMMIT,
registerAtURL: registerAtURL
};
log:printDebug("Registered local participant: " + participantId + " for transaction:" + trxId);
return txnCtx;
}
Expand All @@ -299,24 +312,34 @@
}
}

function hasInitiatedTransaction(string transactionId) returns boolean {
lock {
return initiatedTransactions.hasKey(transactionId);
}
}

function removeInitiatedTransaction(string transactionId) {
var removed = trap initiatedTransactions.remove(transactionId);
if (removed is error) {
panic error lang_trx:Error("Removing initiated transaction: " + transactionId + " failed");
lock {
var removed = trap initiatedTransactions.remove(transactionId);
if (removed is error) {
panic error lang_trx:Error("Removing initiated transaction: " + transactionId + " failed");
}
}
}

function getInitiatorClient(string registerAtURL) returns InitiatorClientEP {
InitiatorClientEP initiatorEP;
if (httpClientCache.hasKey(registerAtURL)) {
return <InitiatorClientEP> checkpanic httpClientCache.get(registerAtURL);
return <InitiatorClientEP>checkpanic httpClientCache.get(registerAtURL);
} else {
lock {
if (httpClientCache.hasKey(registerAtURL)) {
return <InitiatorClientEP> checkpanic httpClientCache.get(registerAtURL);
return <InitiatorClientEP>checkpanic httpClientCache.get(registerAtURL);
}
initiatorEP = new({ registerAtURL: registerAtURL, timeout: 15,
retryConfig: { count: 2, interval: 5 }
initiatorEP = new ({
registerAtURL: registerAtURL,
timeout: 15,
retryConfig: {count: 2, interval: 5}

Check warning on line 342 in transaction-ballerina/commons.bal

View check run for this annotation

Codecov / codecov/patch

transaction-ballerina/commons.bal#L341-L342

Added lines #L341 - L342 were not covered by tests
});
cache:Error? result = httpClientCache.put(registerAtURL, initiatorEP);
if (result is cache:Error) {
Expand All @@ -330,15 +353,17 @@

function getParticipant2pcClient(string participantURL) returns Participant2pcClientEP {
Participant2pcClientEP participantEP;
if (httpClientCache.hasKey(<@untainted> participantURL)) {
return <Participant2pcClientEP> checkpanic httpClientCache.get(<@untainted>participantURL);
if (httpClientCache.hasKey(<@untainted>participantURL)) {
return <Participant2pcClientEP>checkpanic httpClientCache.get(<@untainted>participantURL);
} else {
lock {
if (httpClientCache.hasKey(<@untainted> participantURL)) {
return <Participant2pcClientEP> checkpanic httpClientCache.get(<@untainted>participantURL);
if (httpClientCache.hasKey(<@untainted>participantURL)) {
return <Participant2pcClientEP>checkpanic httpClientCache.get(<@untainted>participantURL);
}
participantEP = new({ participantURL: participantURL,
timeout: 15, retryConfig: { count: 2, interval: 5 }
participantEP = new ({
participantURL: participantURL,
timeout: 15,
retryConfig: {count: 2, interval: 5}

Check warning on line 366 in transaction-ballerina/commons.bal

View check run for this annotation

Codecov / codecov/patch

transaction-ballerina/commons.bal#L365-L366

Added lines #L365 - L366 were not covered by tests
});
cache:Error? result = httpClientCache.put(participantURL, participantEP);
if (result is cache:Error) {
Expand All @@ -352,13 +377,13 @@

# Registers a participant with the initiator's coordinator. This function will be called by the participant.
#
# + transactionId - Global transaction ID to which this participant is registering with.
# + transactionId - Global transaction ID to which this participant is registering with.
# + transactionBlockId - The local ID of the transaction block on the participant.
# + registerAtURL - The URL of the coordinator.
# + participantProtocols - The coordination protocals supported by the participant.
# + return - TransactionContext if the registration is successful or an error in case of a failure.
function registerParticipantWithRemoteInitiator(string transactionId, string transactionBlockId,
string registerAtURL, RemoteProtocol[] participantProtocols)
string registerAtURL, RemoteProtocol[] participantProtocols)
returns TransactionContext|error {

InitiatorClientEP initiatorEP = getInitiatorClient(registerAtURL);
Expand All @@ -368,8 +393,10 @@
if (participatedTransactions.hasKey(participatedTxnId)) {
log:printDebug("Already registered with initiator for transaction:" + participatedTxnId);
TransactionContext txnCtx = {
transactionId:transactionId, transactionBlockId:transactionBlockId,
coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL
transactionId: transactionId,
transactionBlockId: transactionBlockId,
coordinationType: TWO_PHASE_COMMIT,

Check warning on line 398 in transaction-ballerina/commons.bal

View check run for this annotation

Codecov / codecov/patch

transaction-ballerina/commons.bal#L398

Added line #L398 was not covered by tests
registerAtURL: registerAtURL
};
return txnCtx;
}
Expand All @@ -385,12 +412,14 @@
return error lang_trx:Error(msg);
} else {
RemoteProtocol[] coordinatorProtocols = result.coordinatorProtocols;
TwoPhaseCommitTransaction twopcTxn = new(transactionId, transactionBlockId);
TwoPhaseCommitTransaction twopcTxn = new (transactionId, transactionBlockId);
twopcTxn.coordinatorProtocols = toProtocolArray(coordinatorProtocols);
participatedTransactions[participatedTxnId] = twopcTxn;
TransactionContext txnCtx = {
transactionId:transactionId, transactionBlockId:transactionBlockId,
coordinationType:TWO_PHASE_COMMIT, registerAtURL:registerAtURL
transactionId: transactionId,
transactionBlockId: transactionBlockId,
coordinationType: TWO_PHASE_COMMIT,

Check warning on line 421 in transaction-ballerina/commons.bal

View check run for this annotation

Codecov / codecov/patch

transaction-ballerina/commons.bal#L421

Added line #L421 was not covered by tests
registerAtURL: registerAtURL
};
final string trxId = transactionId;
log:printDebug("Registered with coordinator for transaction: " + trxId);
Expand Down
Loading
Loading