diff --git a/bvm/ballerina-runtime/build.gradle b/bvm/ballerina-runtime/build.gradle index 9cc79888cf8a..2a72c674af89 100644 --- a/bvm/ballerina-runtime/build.gradle +++ b/bvm/ballerina-runtime/build.gradle @@ -39,6 +39,8 @@ dependencies { implementation project(':identifier-util') testImplementation libs.testng + testImplementation libs.mockito.core + testImplementation libs.mockito.testng } diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/errors/ErrorCodes.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/errors/ErrorCodes.java index 1a1d8e966fd8..6f09352514d7 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/errors/ErrorCodes.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/internal/errors/ErrorCodes.java @@ -158,7 +158,17 @@ public enum ErrorCodes implements DiagnosticCode { NO_MESSAGE_ERROR("no.worker.message.received", "RUNTIME_0128"), FUNCTION_ALREADY_CALLED("function.already.called", "RUNTIME_0129"), INVALID_FUNCTION_INVOCATION_BEFORE_MODULE_INIT("invalid.function.call.before.module.init", "RUNTIME_0130"), - INVALID_TUPLE_MEMBER_SIZE("invalid.tuple.member.size", "RUNTIME_0131"); + INVALID_TUPLE_MEMBER_SIZE("invalid.tuple.member.size", "RUNTIME_0131"), + + // transaction recovery errors + TRANSACTION_INVALID_CHECKPOINT_VALUE("transaction.invalid.checkpoint.value", "RUNTIME_0131"), + TRANSACTION_IN_HUERISTIC_STATE("transaction.in.heuristic.state", "RUNTIME_0132"), + TRANSACTION_IN_HAZARD_STATE("transaction.in.hazard.state", "RUNTIME_0133"), + TRANSACTION_IN_MIXED_STATE("transaction.in.mixed.state", "RUNTIME_0134"), + TRANSACTION_STARTUP_RECOVERY_FAILED("transaction.startup.recovery.failed", "RUNTIME_0135"), + TRANSACTION_CANNOT_CREATE_LOG_FILE("transaction.cannot.create.log.file", "RUNTIME_0136"), + TRANSACTION_CANNOT_COLLECT_XIDS_IN_RESOURCE("transaction.cannot.collect.xids.in.resource", "RUNTIME_0137"), + TRANSACTION_CANNOT_PARSE_LOG_RECORD("transaction.cannot.parse.log.record", "RUNTIME_0138"); private final String errorMsgKey; private final String errorCode; diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/FileRecoveryLog.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/FileRecoveryLog.java new file mode 100644 index 000000000000..7bce25d7ee38 --- /dev/null +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/FileRecoveryLog.java @@ -0,0 +1,307 @@ +/* + * Copyright (c) 2024, WSO2 Inc. (http://www.wso2.org). + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.runtime.transactions; + +import io.ballerina.runtime.internal.diagnostics.RuntimeDiagnosticLog; +import io.ballerina.runtime.internal.errors.ErrorCodes; +import io.ballerina.runtime.internal.util.RuntimeUtils; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static io.ballerina.runtime.transactions.TransactionConstants.ERROR_MESSAGE_PREFIX; +import static io.ballerina.runtime.transactions.TransactionConstants.NO_CHECKPOINT_INTERVAL; + +/** + * {@code FileRecoveryLog} the file recovery log for transaction recovery. + * + * @since 2201.9.0 + */ +public final class FileRecoveryLog implements RecoveryLog { + + private static final String LOG_FILE_NUMBER = "(\\d+)"; + private static final String LOG_FILE_EXTENSION = ".log"; + private final String baseFileName; + private final Path recoveryLogDir; + private final int checkpointInterval; + private final boolean deleteOldLogs; + private int numOfPutsSinceLastCheckpoint; + private File logFile; + private FileLockAndChannel fileLockAndChannel; + private Map existingLogs; + private static final PrintStream stderr = System.err; + private final RuntimeDiagnosticLog diagnosticLog = new RuntimeDiagnosticLog(); + private static FileRecoveryLog instance; + + /** + * Initializes a new FileRecoveryLog instance with the given base file name. + * + * @param baseFileName The base name for the recovery log files. + * @param checkpointInterval The interval at which to write checkpoints to the log file. + * @param recoveryLogDir The directory to store the recovery log files in. + * @param deleteOldLogs Whether to delete old log files when creating a new one. + */ + private FileRecoveryLog(String baseFileName, int checkpointInterval, Path recoveryLogDir, boolean deleteOldLogs) { + this.baseFileName = baseFileName; + this.recoveryLogDir = recoveryLogDir; + this.deleteOldLogs = deleteOldLogs; + this.checkpointInterval = checkpointInterval; + this.existingLogs = new HashMap<>(); + this.numOfPutsSinceLastCheckpoint = 0; + } + + public static FileRecoveryLog getInstance(String baseFileName, int checkpointInterval, Path recoveryLogDir, + boolean deleteOldLogs) { + if (instance != null) { + throw new IllegalStateException("instance already exists"); + } + instance = new FileRecoveryLog(baseFileName, checkpointInterval, recoveryLogDir, deleteOldLogs); + instance.logFile = instance.createNextVersion(); + return instance; + } + + /** + * Creates the next version of the recovery log file, cleaning up the previous one. + * + * @return The newly created log file. + */ + private File createNextVersion() { + int latestVersion = findLatestVersion(); + File oldFile = recoveryLogDir.resolve(baseFileName + latestVersion + LOG_FILE_EXTENSION).toFile(); + if (oldFile.exists()) { + existingLogs = readLogsFromFile(oldFile); + if (deleteOldLogs) { + File[] matchingLogfiles = getLogFilesInDirectory(); + for (File file : matchingLogfiles) { + file.delete(); + } + } + } + File newFile = recoveryLogDir.resolve(baseFileName + (latestVersion + 1) + LOG_FILE_EXTENSION).toFile(); + try { + Files.createDirectories(recoveryLogDir); // create directory if not exists + newFile.createNewFile(); + fileLockAndChannel = initAppendChannel(newFile); + if (existingLogs == null) { + return newFile; + } + // write existing unfinished logs to the new file + cleanUpFinishedLogs(); + putAll(existingLogs); + existingLogs.clear(); + } catch (IOException e) { + stderr.println(ERROR_MESSAGE_PREFIX + " failed to create recovery log file in " + recoveryLogDir + ": " + + e.getMessage()); + } + return newFile; + } + + /** + * Finds the latest version of the recovery log file. + * + * @return The latest version of the recovery log file. + */ + private int findLatestVersion() { + int latestVersion = 0; + File[] matchingFiles = getLogFilesInDirectory(); + if (matchingFiles == null) { + return latestVersion; + } + for (File file : matchingFiles) { + String fileName = file.getName(); + int version = Integer.parseInt( + fileName.replaceAll(baseFileName, "").replaceAll(LOG_FILE_EXTENSION, "")); + if (version > latestVersion) { + latestVersion = version; + } + } + return latestVersion; + } + + private File[] getLogFilesInDirectory() { + return recoveryLogDir.toFile().listFiles( + (dir, name) -> name.matches(baseFileName + LOG_FILE_NUMBER + LOG_FILE_EXTENSION + "$") + ); + } + + /** + * Initializes the append channel for the given file. + * + * @param file The file to initialize the append channel for. + */ + private FileLockAndChannel initAppendChannel(File file) { + if (fileLockAndChannel == null) { + synchronized (this) { + if (fileLockAndChannel == null) { + try { + FileChannel appendChannel = FileChannel.open(file.toPath(), StandardOpenOption.APPEND); + FileLock lock = appendChannel.tryLock(); + if (lock == null) { + stderr.println( + ERROR_MESSAGE_PREFIX + " failed to acquire lock on recovery log file " + + file.toPath()); + } else { + fileLockAndChannel = new FileLockAndChannel(lock, appendChannel); + } + } catch (IOException e) { + stderr.println( + ERROR_MESSAGE_PREFIX + " failed to acquire lock on recovery log file " + file.toPath() + + ": " + e.getMessage()); + } + } + } + } + return fileLockAndChannel; + } + + @Override + public void put(TransactionLogRecord trxRecord) { + boolean force = !(trxRecord.getTransactionState().equals(RecoveryState.TERMINATED)); // lazy write + writeToFile(trxRecord.getTransactionLogRecordString(), force); + if (checkpointInterval != NO_CHECKPOINT_INTERVAL) { + writeCheckpointIfNeeded(); + numOfPutsSinceLastCheckpoint++; + } + } + + @Override + public void putAll(Map trxRecords) { + for (TransactionLogRecord trxRecord : trxRecords.values()) { + writeToFile(trxRecord.getTransactionLogRecordString(), true); + } + } + + public Map getPendingLogs() { + Map pendingTransactions = new HashMap<>(); + Map transactionLogs = readLogsFromFile(logFile); + for (Map.Entry entry : transactionLogs.entrySet()) { + String trxId = entry.getKey(); + TransactionLogRecord trxRecord = entry.getValue(); + if (!trxRecord.isCompleted()) { + pendingTransactions.put(trxId, trxRecord); + } + } + return pendingTransactions; + } + + /** + * Write a transaction log entry to the recovery log file. + * + * @param str the log entry to write + */ + private void writeToFile(String str, boolean force) { + if (fileLockAndChannel.appendChannel == null || !fileLockAndChannel.appendChannel.isOpen()) { + fileLockAndChannel = initAppendChannel(logFile); + } + byte[] bytes = str.getBytes(); + try { + fileLockAndChannel.appendChannel.write(ByteBuffer.wrap(bytes)); + fileLockAndChannel.appendChannel.force(force); + } catch (IOException e) { + stderr.println(ERROR_MESSAGE_PREFIX + " failed to write to recovery log file " + logFile.toPath() + ": " + + e.getMessage()); + } + } + + /** + * Reads the transaction logs from the log file. + * + * @param file The file to read the transaction logs from. + * @return The transaction logs read from the file. + */ + private Map readLogsFromFile(File file) { + if (!file.exists() || file.length() == 0) { + return Collections.emptyMap(); + } + if (fileLockAndChannel != null) { + closeEverything(); + } + Map logMap = new HashMap<>(); + try (BufferedReader reader = new BufferedReader(new FileReader(file))) { + String line; + while ((line = reader.readLine()) != null) { + TransactionLogRecord transactionLogRecord = TransactionLogRecord.parseTransactionLogRecord(line); + if (transactionLogRecord == null) { + diagnosticLog.error(ErrorCodes.TRANSACTION_CANNOT_PARSE_LOG_RECORD, null, line); + continue; + } + logMap.put(transactionLogRecord.getCombinedId(), transactionLogRecord); + } + } catch (IOException e) { + stderr.println(ERROR_MESSAGE_PREFIX + " failed to read the recovery log file " + file.toPath() + ": " + + e.getMessage()); + } + if (!diagnosticLog.getDiagnosticList().isEmpty()) { + RuntimeUtils.handleDiagnosticErrors(diagnosticLog); + } + return logMap; + } + + private void cleanUpFinishedLogs() { + if (existingLogs.isEmpty()) { + return; + } + // Safely remove the completed entries + existingLogs.entrySet().removeIf(entry -> entry.getValue().isCompleted()); + } + + private void writeCheckpointIfNeeded() { + if (numOfPutsSinceLastCheckpoint >= checkpointInterval) { + numOfPutsSinceLastCheckpoint = 0; // need to set here otherwise it will just keep creating new files + logFile = createNextVersion(); + } + } + + @Override + public void close() { + closeEverything(); + } + + private void closeEverything() { + try { + fileLockAndChannel.close(); + } catch (IOException e) { + // nothing to do here. + } + } + + public record FileLockAndChannel(FileLock lock, FileChannel appendChannel) { + public void close() throws IOException { + if (lock != null) { + lock.release(); + } + if (appendChannel != null) { + appendChannel.close(); + } + } + } + +} diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/InMemoryRecoveryLog.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/InMemoryRecoveryLog.java new file mode 100644 index 000000000000..0c640c62a3de --- /dev/null +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/InMemoryRecoveryLog.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2024, WSO2 Inc. (http://www.wso2.org). + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.runtime.transactions; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static io.ballerina.runtime.transactions.TransactionConstants.IN_MEMORY_CHECKPOINT_INTERVAL; + +/** + * {@code InMemoryRecoveryLog} the in-memory recovery log for transaction recovery. + * + * @since 2201.9.0 + */ +public final class InMemoryRecoveryLog implements RecoveryLog { + + private final Map transactionLogs = new ConcurrentHashMap<>(); + private int numOfPutsSinceLastCheckpoint = 0; + private static InMemoryRecoveryLog instance; + + private InMemoryRecoveryLog() { + } + + public static InMemoryRecoveryLog getInstance() { + if (instance == null) { + instance = new InMemoryRecoveryLog(); + } + return instance; + } + + @Override + public void put(TransactionLogRecord trxRecord) { + transactionLogs.put(trxRecord.getCombinedId(), trxRecord); + writeCheckpointIfNeeded(); + numOfPutsSinceLastCheckpoint++; + } + + @Override + public void putAll(Map trxRecords) { + transactionLogs.putAll(trxRecords); + } + + /** + * Write a checkpoint to the in-memory log (not needed if you don't need checkpoints). + */ + private void writeCheckpointIfNeeded() { + if (numOfPutsSinceLastCheckpoint >= IN_MEMORY_CHECKPOINT_INTERVAL) { + Map pendingTransactions = getFailedTransactions(); + clearAllLogs(); + transactionLogs.putAll(pendingTransactions); + } + } + + /** + * Retrieve all pending transactions from the in-memory log. + * + * @return Map of pending transactions + */ + public Map getFailedTransactions() { + Map failedTransactions = new ConcurrentHashMap<>(); + synchronized (transactionLogs) { + for (Map.Entry entry : transactionLogs.entrySet()) { + String trxId = entry.getKey(); + TransactionLogRecord trxRecord = entry.getValue(); + if (!trxRecord.isCompleted()) { + failedTransactions.put(trxId, trxRecord); + } + } + } + return failedTransactions; + } + + public Map getAllLogs() { + return transactionLogs; + } + + public void clearAllLogs() { + transactionLogs.clear(); + } + + @Override + public void close() { + } +} diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/LogManager.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/LogManager.java new file mode 100644 index 000000000000..201c85b077b0 --- /dev/null +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/LogManager.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2024, WSO2 Inc. (http://www.wso2.org). + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.runtime.transactions; + +import java.nio.file.Path; +import java.util.Map; + +/** + * The log manager is responsible for managing the transaction logs. + * + * @since 2201.9.0 + */ +public class LogManager { + + private final FileRecoveryLog fileRecoveryLog; + private final InMemoryRecoveryLog inMemoryRecoveryLog; + private static LogManager instance; + + /** + * Create a log manager with the given base file name, checkpoint interval, recovery log directory, and delete old + * logs flag. + * + * @param baseFileName the base file name + * @param checkpointInterval the checkpoint interval + * @param recoveryLogDir the recovery log directory + * @param deleteOldLogs the delete old logs flag + */ + private LogManager(String baseFileName, int checkpointInterval, Path recoveryLogDir, boolean deleteOldLogs) { + this.fileRecoveryLog = + FileRecoveryLog.getInstance(baseFileName, checkpointInterval, recoveryLogDir, deleteOldLogs); + this.inMemoryRecoveryLog = InMemoryRecoveryLog.getInstance(); + init(); + } + + public static LogManager getInstance(String baseFileName, int checkpointInterval, Path recoveryLogDir, + boolean deleteOldLogs) { + if (instance == null) { + instance = new LogManager(baseFileName, checkpointInterval, recoveryLogDir, deleteOldLogs); + } + return instance; + } + + private void init() { + // Read pending transactions from the file recovery log and add them to the in-memory log. + Map pendingTransactions = fileRecoveryLog.getPendingLogs(); + inMemoryRecoveryLog.putAll(pendingTransactions); + } + + /** + * Write a log entry to both the in-memory log and log file. + * + * @param trxRecord the transaction log record + */ + public void put(TransactionLogRecord trxRecord) { + inMemoryRecoveryLog.put(trxRecord); + fileRecoveryLog.put(trxRecord); + } + + /** + * Get the failed transaction logs from the in-memory log. + * + * @return the failed transaction logs + */ + public Map getFailedTransactionLogs() { + return inMemoryRecoveryLog.getFailedTransactions(); + } + + /** + * Close the file recovery log releasing the file lock. + */ + public void close() { + fileRecoveryLog.close(); + inMemoryRecoveryLog.close(); + } +} diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/RecoveryLog.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/RecoveryLog.java new file mode 100644 index 000000000000..cb498bb923ce --- /dev/null +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/RecoveryLog.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024, WSO2 Inc. (http://www.wso2.org). + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.runtime.transactions; + +import java.util.Map; + +/** + * The RecoveryLog interface for the recovery logs. + * + * @since 2201.9.0 + */ +public interface RecoveryLog { + + /** + * Write a log entry to the recovery log. + * + * @param trxRecord the transaction log record + */ + void put(TransactionLogRecord trxRecord); + + /** + * Inserts all the transaction log records into the current map. + * + * @param trxRecords A map of transaction log records to be inserted + */ + void putAll(Map trxRecords); + + /** + * Close the recovery log file. + */ + void close(); +} diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/RecoveryManager.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/RecoveryManager.java new file mode 100644 index 000000000000..2fa0dedf3f11 --- /dev/null +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/RecoveryManager.java @@ -0,0 +1,315 @@ +/* + * Copyright (c) 2024, WSO2 Inc. (http://www.wso2.org). + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.runtime.transactions; + +import io.ballerina.runtime.internal.diagnostics.RuntimeDiagnosticLog; +import io.ballerina.runtime.internal.errors.ErrorCodes; +import io.ballerina.runtime.internal.util.RuntimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +/** + * {@code RecoveryManager} is responsible for recovering failed transactions and resources. + * + * @since 2201.9.0 + */ +public class RecoveryManager { + + private static final Logger log = LoggerFactory.getLogger(RecoveryManager.class); + private final Map failedTransactions; + private final Collection xaResources; + private final RuntimeDiagnosticLog diagnosticLog = new RuntimeDiagnosticLog(); + private static final PrintStream stderr = System.err; + + public RecoveryManager() { + this.failedTransactions = new HashMap<>(); + this.xaResources = new ArrayList<>(); + } + + /** + * Add a transaction log record to the list of records to recover. + * + * @param transactionLogRecords the record to add + */ + private void putFailedTransactionRecords(Map transactionLogRecords) { + failedTransactions.putAll(transactionLogRecords); + } + + /** + * Get the list of transaction log records that are waiting to be recovered. + * + * @return the list of transaction log records to recover + */ + public Map getPendingTransactionRecords() { + return this.failedTransactions; + } + + /** + * Add a xa resource to the list of resources to recover. Recoverable XAResources should be added from the relevant + * library side during their initialization. + * + * @param xaResource the resource to be recovered + */ + public void addXAResourceToRecover(XAResource xaResource) { + try { + for (XAResource xaResc : xaResources) { + if (xaResc.isSameRM(xaResource)) { + return; + } + } + } catch (XAException e) { + // can ignore, we are adding it to array anyway. + } + xaResources.add(xaResource); + } + + /** + * Perform a recovery pass to recover all failed transactions in xa resources. + * + * @return true if recovery pass is successful, false otherwise + */ + public boolean performRecoveryPass() { + boolean recoverSuccess = true; // assume success, until it is not + + // Get all the transaction records without terminated logs; + putFailedTransactionRecords( + TransactionResourceManager.getInstance().getLogManager().getFailedTransactionLogs()); + + Iterator> iterator = failedTransactions.entrySet().iterator(); + while (iterator.hasNext()) { + TransactionLogRecord logRecord = iterator.next().getValue(); + switch (logRecord.getTransactionState()) { + case PREPARING, COMMITTING, ABORTING, COMMITTED, ABORTED -> { + // if the transaction was in any of the terminating states, it means that the 2pc has initiated + // and, it has impacted the resources. Therefore, we need to recover the transaction accordingly. + Xid xid = XIDGenerator.createXID(logRecord.getCombinedId()); + boolean singleTrxRecoverSuccess = + recoverFailedTrxInAllResources(xid, logRecord.getTransactionState()); + if (singleTrxRecoverSuccess) { + // put a terminated log record to indicate that the transaction was recovered successfully + TransactionLogRecord terminatedRecord = new TransactionLogRecord( + logRecord.getTransactionId(), logRecord.getTransactionBlockId(), + RecoveryState.TERMINATED); + TransactionResourceManager.getInstance().getLogManager().put(terminatedRecord); + iterator.remove(); + } + recoverSuccess = recoverSuccess && singleTrxRecoverSuccess; + } + case MIXED, HAZARD -> { + // if the transaction was in any of the mixed or hazard states, it means that the transaction is + // not in a state we can handle and should be recovered manually, so we inform the user. + String combinedId = logRecord.getCombinedId(); + switch (logRecord.getTransactionState()) { + case MIXED -> diagnosticLog.warn(ErrorCodes.TRANSACTION_IN_MIXED_STATE, null, combinedId); + case HAZARD -> diagnosticLog.warn(ErrorCodes.TRANSACTION_IN_HAZARD_STATE, null, combinedId); + } + iterator.remove(); // consider it handled, as we have warned the user + } + } + } + + if (!recoverSuccess) { + diagnosticLog.error(ErrorCodes.TRANSACTION_STARTUP_RECOVERY_FAILED, null); + } + + // notify the user of all startup recovery warns and errors + if (!diagnosticLog.getDiagnosticList().isEmpty()) { + RuntimeUtils.handleDiagnosticErrors(diagnosticLog); + } + return recoverSuccess; + } + + /** + * Recovers a failed transactions in all XAResources. + * + * @param xid the xid of the transaction + * @param state the state of the transaction + * @return true if all transactions are recovered successfully from all resources, false otherwise + */ + private boolean recoverFailedTrxInAllResources(Xid xid, RecoveryState state) { + boolean allResourcesRecovered = true; + for (XAResource xaResource : xaResources) { + boolean recoveredResource = recoverFailedTrxInXAResource(xaResource, xid, state); + allResourcesRecovered = recoveredResource && allResourcesRecovered; + } + return allResourcesRecovered; + } + + private boolean recoverFailedTrxInXAResource(XAResource xaResource, Xid xid, RecoveryState state) { + return switch (state) { + case PREPARING, ABORTING -> handleAbort(xaResource, xid); + case COMMITTING -> replayCommit(xaResource, xid); + case COMMITTED, ABORTED -> { + forgetXidInXaResource(xid, xaResource); + yield true; + } + default -> false; + }; + } + + /** + * Handle commit of a transaction in transaction recovery. + * + * @param xaResource the resource that the transaction is associated with + * @param xid the xid of the transaction + * @return true if the commit is successful and the log can be forgotten, false otherwise + */ + private boolean replayCommit(XAResource xaResource, Xid xid) { + try { + xaResource.commit(xid, false); + return true; + } catch (XAException e) { + return switch (e.errorCode) { + // case: transaction already heuristically terminated by resource + case XAException.XA_HEURCOM, XAException.XA_HEURHAZ, XAException.XA_HEURMIX, XAException.XA_HEURRB -> + handleHeuristicTermination(xid, xaResource, e, true); + // case : transaction terminated in resource by a concurrent commit; xid no longer know by resource + case XAException.XAER_NOTA, XAException.XAER_INVAL -> true; + default -> { + log.error("transient error while replaying commit for transaction: " + xid + " " + e.getMessage()); + yield false; + } + }; + } + } + + /** + * Handle abort of a transaction in transaction recovery. + * + * @param xaResource the resource that the transaction is associated with + * @param xid the xid of the transaction + * @return true if the abort is successful and the log can be forgotten, false otherwise + */ + private boolean handleAbort(XAResource xaResource, Xid xid) { + try { + xaResource.rollback(xid); + return true; + } catch (XAException e) { + return switch (e.errorCode) { + // case: transaction already heuristically terminated by resource + case XAException.XA_HEURCOM, XAException.XA_HEURHAZ, XAException.XA_HEURMIX, XAException.XA_HEURRB -> + handleHeuristicTermination(xid, xaResource, e, false); + // case : transaction terminated in resource by a concurrent rollback; xid no longer know by resource + case XAException.XAER_NOTA, XAException.XAER_INVAL -> true; + default -> { + log.error("transient error while replaying abort for transaction: " + xid + " " + e.getMessage()); + yield false; + } + }; + } + } + + /** + * Handle heuristic termination of a transaction. + * + * @param xid the xid of the transaction + * @param xaResource the resource that the transaction is associated with + * @param e the XAException that was thrown + * @param decisionCommit the decision that was made for that specific transaction + * @return true if the log can be forgotten, false otherwise + *

+ * "heuristic" itself means "by hand", and that is the way that these outcomes have to be handled. Consider the + * following possible cases: + * 1. If the decision was to commit, the transaction is heuristically committed, the log + * can be forgotten. + * 2. If the decision was to rollback, the transaction is heuristically rolled back, the log can + * be forgotten. + * 3. If the decision was to commit, but the transaction is heuristically rolled back, or vise versa, + * it needs to be handled manually. + * 4. If the decision was to commit/rollback, but the transaction is heuristically + * mixed, or is in hazard state, it needs to be handled manually. + */ + private boolean handleHeuristicTermination(Xid xid, XAResource xaResource, XAException e, boolean decisionCommit) { + boolean shouldForgetXid = true; + + switch (e.errorCode) { + case XAException.XA_HEURCOM: + if (!decisionCommit) { + reportUserOfHeuristics(e, xid, decisionCommit); + } + break; + case XAException.XA_HEURRB: + if (decisionCommit) { + reportUserOfHeuristics(e, xid, decisionCommit); + } + break; + case XAException.XA_HEURMIX: + reportUserOfHeuristics(e, xid, decisionCommit); + break; + case XAException.XA_HEURHAZ: + reportUserOfHeuristics(e, xid, decisionCommit); + shouldForgetXid = false; + break; + } + + if (shouldForgetXid) { + forgetXidInXaResource(xid, xaResource); + } + return shouldForgetXid; + } + + /** + * Reports the user of heuristic termination of a transaction. + * + * @param e the XAException that was thrown + * @param xid the xid of the transaction + * @param decisionCommit the decision that was made for that specific transaction + */ + private void reportUserOfHeuristics(XAException e, Xid xid, boolean decisionCommit) { + String transactionID = new String(xid.getGlobalTransactionId()); + String transactionBlockId = new String(xid.getBranchQualifier()); + String combinedId = transactionID + ":" + transactionBlockId; + String decision = decisionCommit ? "commit" : "rollback"; + switch (e.errorCode) { + case XAException.XA_HEURCOM -> diagnosticLog.warn(ErrorCodes.TRANSACTION_IN_HUERISTIC_STATE, null, + combinedId, "heuristic commit", decision); + case XAException.XA_HEURRB -> diagnosticLog.warn(ErrorCodes.TRANSACTION_IN_HUERISTIC_STATE, null, + combinedId, "heuristic rollback", decision); + case XAException.XA_HEURMIX -> diagnosticLog.warn(ErrorCodes.TRANSACTION_IN_HUERISTIC_STATE, null, + combinedId, "heuristic mixed", decision); + case XAException.XA_HEURHAZ -> diagnosticLog.warn(ErrorCodes.TRANSACTION_IN_HUERISTIC_STATE, null, + combinedId, "heuristic hazard", decision); + } + } + + /** + * Forgets a xid in a xa resource. + * + * @param xid the xid to forget + * @param xaResource the resource to forget the xid in + */ + private void forgetXidInXaResource(Xid xid, XAResource xaResource) { + try { + xaResource.forget(xid); + } catch (XAException e) { + // ignore. worst case, heuristic xid is present again on next recovery scan + } + } +} diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/RecoveryState.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/RecoveryState.java new file mode 100644 index 000000000000..6d7a6347e8c0 --- /dev/null +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/RecoveryState.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2024, WSO2 Inc. (http://www.wso2.org). + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.runtime.transactions; + +/** + * {@code RecoveryState} Defines the recovery states of a transaction. + * + * @since 2201.9.0 + */ +public enum RecoveryState { + // prepare record + PREPARING("PREPARING"), + // decision records + COMMITTING("COMMITTING"), + ABORTING("ABORTING"), + // outcome records + COMMITTED("COMMITTED"), + ABORTED("ABORTED"), + MIXED("MIXED"), + HAZARD("HAZARD"), + // done record + TERMINATED("TERMINATED"); + + private final String state; + + RecoveryState(String state) { + this.state = state; + } + + public String getState() { + return state; + } + + /** + * Get the recovery state for the given state. + * + * @param state the state as a string + * @return the recovery state + */ + public static RecoveryState getRecoveryState(String state) { + for (RecoveryState recoveryState : RecoveryState.values()) { + if (recoveryState.getState().equals(state)) { + return recoveryState; + } + } + return null; + } + + @Override + public String toString() { + return state; + } +} diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/TransactionConstants.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/TransactionConstants.java index e8cfd5af24ac..63797eee1283 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/TransactionConstants.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/TransactionConstants.java @@ -71,6 +71,11 @@ public final class TransactionConstants { public static final String ANN_NAME_TRX_PARTICIPANT_CONFIG = "Participant"; public static final String TIMESTAMP_OBJECT_VALUE_FIELD = "timeValue"; + + public static final int DEFAULT_CHECKPOINT_INTERVAL = 10000; + public static final int NO_CHECKPOINT_INTERVAL = -1; + public static final int IN_MEMORY_CHECKPOINT_INTERVAL = 25; + public static final String ERROR_MESSAGE_PREFIX = "error:"; public static final int DEFAULT_TRX_AUTO_COMMIT_TIMEOUT = 120; public static final int DEFAULT_TRX_CLEANUP_TIMEOUT = 600; diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/TransactionLogRecord.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/TransactionLogRecord.java new file mode 100644 index 000000000000..8e8856d14db3 --- /dev/null +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/TransactionLogRecord.java @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2024, WSO2 Inc. (http://www.wso2.org). + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.runtime.transactions; + +/** + * {@code TransactionLogRecord} represents a transaction log record. + * + * @since 2201.9.0 + */ +public class TransactionLogRecord { + + private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + private static final String FIELD_SEPARATOR = "|"; + private static final String COMBINED_ID_SEPARATOR = ":"; + + public String transactionId; + private String transactionBlockId; + private RecoveryState transactionStatus; + private long logTime; + + /** + * Create a transaction log record. + * + * @param transactionId the transaction id + * @param transactionBlockId the transaction block id + * @param transactionStatus the current status of the transaction + */ + + public TransactionLogRecord(String transactionId, String transactionBlockId, RecoveryState transactionStatus) { + this.transactionId = transactionId; + this.transactionBlockId = transactionBlockId; + this.transactionStatus = transactionStatus; + this.logTime = System.currentTimeMillis(); + } + + public TransactionLogRecord(String transactionId, String transactionBlockId, RecoveryState transactionStatus, + long logTime) { + this.transactionId = transactionId; + this.transactionBlockId = transactionBlockId; + this.transactionStatus = transactionStatus; + this.logTime = logTime; + } + + public RecoveryState getTransactionState() { + return transactionStatus; + } + + public String getTransactionId() { + return transactionId; + } + + public String getTransactionBlockId() { + return transactionBlockId; + } + + public String getCombinedId() { + return transactionId + ":" + transactionBlockId; + } + + /** + * Get the transaction log record as a string to write to file recovery log. + * + * @return the transaction log record as a string + */ + public String getTransactionLogRecordString() { + return transactionId + COMBINED_ID_SEPARATOR + transactionBlockId + FIELD_SEPARATOR + transactionStatus + + FIELD_SEPARATOR + logTime + LINE_SEPARATOR; + } + + /** + * Parse a transaction log record from a log line. + * + * @param logLine the log as a string + * @return the transaction log record + */ + public static TransactionLogRecord parseTransactionLogRecord(String logLine) { + logLine = logLine.stripTrailing(); + String[] logBlocks = logLine.split("\\|"); + if (logBlocks.length == 3) { + String[] combinedId = logBlocks[0].split(COMBINED_ID_SEPARATOR); + String transactionStatusString = logBlocks[1]; + Long logTime = Long.parseLong(logBlocks[2]); + String transactionId = combinedId[0]; + String transactionBlockId = combinedId[1]; + RecoveryState transactionStatus = RecoveryState.getRecoveryState(transactionStatusString); + return new TransactionLogRecord(transactionId, transactionBlockId, transactionStatus, logTime); + } + return null; + } + + /** + * Check whether the transaction is in a final state. + * + * @return true if the transaction is completed + */ + public boolean isCompleted() { + return transactionStatus.equals(RecoveryState.TERMINATED); + } +} diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/TransactionResourceManager.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/TransactionResourceManager.java index 35eec8a9c1c9..cc383b5af75d 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/TransactionResourceManager.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/TransactionResourceManager.java @@ -27,6 +27,8 @@ import io.ballerina.runtime.api.values.BString; import io.ballerina.runtime.internal.configurable.ConfigMap; import io.ballerina.runtime.internal.configurable.VariableKey; +import io.ballerina.runtime.internal.diagnostics.RuntimeDiagnosticLog; +import io.ballerina.runtime.internal.errors.ErrorCodes; import io.ballerina.runtime.internal.scheduling.Scheduler; import io.ballerina.runtime.internal.scheduling.Strand; import io.ballerina.runtime.internal.util.RuntimeUtils; @@ -62,6 +64,9 @@ import static io.ballerina.runtime.api.constants.RuntimeConstants.BALLERINA_BUILTIN_PKG_PREFIX; import static io.ballerina.runtime.transactions.TransactionConstants.DEFAULT_TRX_AUTO_COMMIT_TIMEOUT; import static io.ballerina.runtime.transactions.TransactionConstants.DEFAULT_TRX_CLEANUP_TIMEOUT; +import static io.ballerina.runtime.transactions.TransactionConstants.DEFAULT_CHECKPOINT_INTERVAL; +import static io.ballerina.runtime.transactions.TransactionConstants.ERROR_MESSAGE_PREFIX; +import static io.ballerina.runtime.transactions.TransactionConstants.NO_CHECKPOINT_INTERVAL; import static io.ballerina.runtime.transactions.TransactionConstants.TRANSACTION_PACKAGE_ID; import static io.ballerina.runtime.transactions.TransactionConstants.TRANSACTION_PACKAGE_NAME; import static io.ballerina.runtime.transactions.TransactionConstants.TRANSACTION_PACKAGE_VERSION; @@ -107,6 +112,11 @@ public class TransactionResourceManager { private static final PrintStream STDERR = System.err; final Map transactionInfoMap = new ConcurrentHashMap<>(); + private LogManager logManager; + private RecoveryManager recoveryManager; + private boolean startupRecoverySuccessful = false; + + RuntimeDiagnosticLog diagnosticLog = new RuntimeDiagnosticLog(); private TransactionResourceManager() { transactionManagerEnabled = getTransactionManagerEnabled(); @@ -116,6 +126,12 @@ private TransactionResourceManager() { userTransactionManager = new UserTransactionManager(); } else { xidRegistry = new HashMap<>(); + logManager = LogManager.getInstance(getRecoveryLogBaseName(), getCheckpointInterval(), + getRecoveryLogDir(), getDeleteOldLogs()); + recoveryManager = new RecoveryManager(); + if (!diagnosticLog.getDiagnosticList().isEmpty()) { + RuntimeUtils.handleDiagnosticErrors(diagnosticLog); + } } } @@ -130,9 +146,20 @@ public static TransactionResourceManager getInstance() { return transactionResourceManager; } + public LogManager getLogManager() { + return transactionResourceManager.logManager; + } + + public RecoveryManager getRecoveryManager() { + return transactionResourceManager.recoveryManager; + } + + public Map> getResourceRegistry() { + return transactionResourceManager.resourceRegistry; + } + /** * This method sets values for atomikos transaction log path and name properties using the available configs. - * */ private void setLogProperties() { final Path projectRoot = Path.of(RuntimeUtils.USER_DIR); @@ -150,7 +177,7 @@ private void setLogProperties() { try { Files.createDirectory(transactionLogDirectory); } catch (IOException e) { - STDERR.println("error: failed to create transaction log directory in " + logDir); + STDERR.println(ERROR_MESSAGE_PREFIX + " failed to create transaction log directory in " + logDir); } } System.setProperty(ATOMIKOS_LOG_BASE_PROPERTY, logDir); @@ -238,6 +265,86 @@ private static int parseTimeoutValue(Object configValue, int defaultValue) { return timeoutValue; } + /** + * This method gets the user specified config for ballerina recovery log name. + * + * @return string recovery log file name + */ + private String getRecoveryLogBaseName() { + VariableKey recoveryLogNameKey = + new VariableKey(TRANSACTION_PACKAGE_ID, "recoveryLogName", PredefinedTypes.TYPE_STRING, false); + if (!ConfigMap.containsKey(recoveryLogNameKey)) { + return "recoveryLog"; + } + return ((BString) ConfigMap.get(recoveryLogNameKey)).getValue(); + } + + /** + * This method gets the user specified config for ballerina recovery log directory. + * + * @return string recovery log directory + */ + private Path getRecoveryLogDir() { + final Path projectRoot = Path.of(RuntimeUtils.USER_DIR); + VariableKey recoveryLogDirKey = + new VariableKey(TRANSACTION_PACKAGE_ID, "recoveryLogDir", PredefinedTypes.TYPE_STRING, false); + if (!ConfigMap.containsKey(recoveryLogDirKey)) { + return projectRoot; + } + String logDir = ((BString) ConfigMap.get(recoveryLogDirKey)).getValue(); + Path logDirPath = Path.of(logDir); + if (!logDirPath.isAbsolute()) { + logDir = projectRoot.toAbsolutePath().toString() + File.separatorChar + logDir; + return Path.of(logDir); + } + return logDirPath; + } + + /** + * This method gets the user specified config for checkpoint interval. + * + * @return int checkpoint interval + */ + private Integer getCheckpointInterval() { + VariableKey checkpointIntervalKey = + new VariableKey(TRANSACTION_PACKAGE_ID, "checkpointInterval", PredefinedTypes.TYPE_INT, false); + if (!ConfigMap.containsKey(checkpointIntervalKey)) { + return DEFAULT_CHECKPOINT_INTERVAL; + } else { + int checkpointInterval; + Object value = ConfigMap.get(checkpointIntervalKey); + if (value instanceof Long) { + checkpointInterval = ((Long) value).intValue(); + } else if (value instanceof Integer) { + checkpointInterval = (Integer) value; + } else { + diagnosticLog.warn(ErrorCodes.TRANSACTION_INVALID_CHECKPOINT_VALUE, null, DEFAULT_CHECKPOINT_INTERVAL); + return DEFAULT_CHECKPOINT_INTERVAL; + } + if (checkpointInterval < 0 && checkpointInterval != NO_CHECKPOINT_INTERVAL) { + diagnosticLog.warn(ErrorCodes.TRANSACTION_INVALID_CHECKPOINT_VALUE, null, DEFAULT_CHECKPOINT_INTERVAL); + return DEFAULT_CHECKPOINT_INTERVAL; + } else { + return checkpointInterval; + } + } + } + + /** + * This method gets the user specified config for whether to delete old logs or not. + * + * @return boolean whether to delete old logs or not + */ + public boolean getDeleteOldLogs() { + VariableKey deleteOldLogsKey = new VariableKey(TRANSACTION_PACKAGE_ID, "deleteOldLogs", + PredefinedTypes.TYPE_BOOLEAN, false); + if (!ConfigMap.containsKey(deleteOldLogsKey)) { + return true; + } else { + return (boolean) ConfigMap.get(deleteOldLogsKey); + } + } + /** * This method will register connection resources with a particular transaction. * @@ -254,7 +361,7 @@ public void register(String transactionId, String transactionBlockId, BallerinaT * This method will register a committed function handler of a particular transaction. * * @param transactionBlockId the block id of the transaction - * @param fpValue the function pointer for the committed function + * @param fpValue the function pointer for the committed function */ public void registerCommittedFunction(String transactionBlockId, BFunctionPointer fpValue) { if (fpValue != null) { @@ -266,7 +373,7 @@ public void registerCommittedFunction(String transactionBlockId, BFunctionPointe * This method will register an aborted function handler of a particular transaction. * * @param transactionBlockId the block id of the transaction - * @param fpValue the function pointer for the aborted function + * @param fpValue the function pointer for the aborted function */ public void registerAbortedFunction(String transactionBlockId, BFunctionPointer fpValue) { if (fpValue != null) { @@ -289,7 +396,7 @@ public void registerParticipation(String gTransactionId, String transactionBlock } /** - * This method acts as the callback which notify all the resources participated in the given transaction. + * This method acts as the callback which notify all the resources participated in the given transaction. * * @param transactionId the global transaction id * @param transactionBlockId the block id of the transaction @@ -347,7 +454,7 @@ public boolean notifyCommit(String transactionId, String transactionBlockId) { trx.commit(); } } catch (SystemException | HeuristicMixedException | HeuristicRollbackException - | RollbackException e) { + | RollbackException e) { LOG.error("error when committing transaction " + transactionId + ":" + e.getMessage(), e); commitSuccess = false; } @@ -444,8 +551,8 @@ public boolean notifyAbort(String transactionId, String transactionBlockId) { } /** - * This method starts a transaction for the given xa resource. If there is no transaction is started for the - * given XID a new transaction is created. + * This method starts a transaction for the given xa resource. If there is no transaction is started for the given + * XID a new transaction is created. * * @param transactionId the global transaction id * @param transactionBlockId the block id of the transaction @@ -468,7 +575,7 @@ public void beginXATransaction(String transactionId, String transactionBlockId, } else { Xid xid = xidRegistry.get(combinedId); if (xid == null) { - xid = XIDGenerator.createXID(); + xid = XIDGenerator.createXID(combinedId); xidRegistry.put(combinedId, xid); } try { @@ -480,18 +587,21 @@ public void beginXATransaction(String transactionId, String transactionBlockId, } /** - * Cleanup the Info record keeping state related to current transaction context and remove the current - * context from the stack. + * Cleanup the Info record keeping state related to current transaction context and remove the current context from + * the stack. */ public void cleanupTransactionContext() { Strand strand = Scheduler.getStrand(); TransactionLocalContext transactionLocalContext = strand.currentTrxContext; + writeToLog(transactionLocalContext.getGlobalTransactionId(), + transactionLocalContext.getCurrentTransactionBlockId(), RecoveryState.TERMINATED); transactionLocalContext.removeTransactionInfo(); strand.removeCurrentTrxContext(); } /** * This method returns true if there is a failure of the current transaction, otherwise false. + * * @return true if there is a failure of the current transaction. */ public boolean getAndClearFailure() { @@ -499,8 +609,9 @@ public boolean getAndClearFailure() { } /** - * This method is used to get the error which is set by calling setRollbackOnly(). - * If it is not set, then returns null. + * This method is used to get the error which is set by calling setRollbackOnly(). If it is not set, then returns + * null. + * * @return the error or null. */ public Object getRollBackOnlyError() { @@ -510,6 +621,7 @@ public Object getRollBackOnlyError() { /** * This method checks if the current strand is in a transaction or not. + * * @return True if the current strand is in a transaction. */ public boolean isInTransaction() { @@ -518,6 +630,7 @@ public boolean isInTransaction() { /** * This method notify the given transaction to abort. + * * @param transactionBlockId The transaction blockId */ public void notifyTransactionAbort(String transactionBlockId) { @@ -526,6 +639,7 @@ public void notifyTransactionAbort(String transactionBlockId) { /** * This method retrieves the list of rollback handlers. + * * @return Array of rollback handlers */ public BArray getRegisteredRollbackHandlerList() { @@ -542,6 +656,7 @@ public BArray getRegisteredRollbackHandlerList() { /** * This method retrieves the list of commit handlers. + * * @return Array of commit handlers */ public BArray getRegisteredCommitHandlerList() { @@ -573,6 +688,7 @@ public void setContextNonTransactional() { /** * This method set the given transaction context as the current transaction context in the stack. + * * @param trxCtx The input transaction context */ public void setCurrentTransactionContext(TransactionLocalContext trxCtx) { @@ -581,6 +697,7 @@ public void setCurrentTransactionContext(TransactionLocalContext trxCtx) { /** * This method returns the current transaction context. + * * @return The current Transaction Context */ public TransactionLocalContext getCurrentTransactionContext() { @@ -669,4 +786,33 @@ public Object getTransactionRecord(BArray xid) { return null; } } + + /** + * Handles initial recovery after a crash. This method is called after all the resources are added and before a new + * transaction begins. + */ + public synchronized void startupCrashRecovery() { + if (!startupRecoverySuccessful) { + boolean allRecovered = recoveryManager.performRecoveryPass(); + if (allRecovered) { + startupRecoverySuccessful = true; + } + } + } + + /** + * This method writes a transaction log record to the recovery log file. Skips if the atomikos tm is used. + * + * @param globalTransactionId the global transaction id + * @param currentTransactionBlockId the block id of the transaction + * @param recoveryState the state of the transaction + */ + public void writeToLog(String globalTransactionId, String currentTransactionBlockId, RecoveryState recoveryState) { + if (transactionManagerEnabled) { + return; + } + TransactionLogRecord logRecord = new TransactionLogRecord(globalTransactionId, currentTransactionBlockId, + recoveryState); + getInstance().getLogManager().put(logRecord); + } } diff --git a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/XIDGenerator.java b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/XIDGenerator.java index 4131ba35b8d5..2eda372c3a1f 100644 --- a/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/XIDGenerator.java +++ b/bvm/ballerina-runtime/src/main/java/io/ballerina/runtime/transactions/XIDGenerator.java @@ -17,8 +17,6 @@ */ package io.ballerina.runtime.transactions; -import java.security.SecureRandom; -import java.util.concurrent.atomic.AtomicInteger; /** * Generates XID for the distributed transactions. @@ -26,20 +24,18 @@ * @since 1.0 */ public final class XIDGenerator { + // DEFAULT_FORMAT will be unique for ballerina but same for each transaction + private static final int DEFAULT_FORMAT = ('B' << 24) + ('A' << 16) + ('L' << 8); - private static final SecureRandom secureRand = new SecureRandom(); - private static final AtomicInteger formatIdIdGenerator = new AtomicInteger(); - - private static byte[] randomBytes() { - final byte[] bytes = new byte[48]; - secureRand.nextBytes(bytes); - return bytes; + static XATransactionID createXID(String combinedId) { + String[] trxId = (combinedId.split("_")[0]).split(":"); + final byte[] branchQualifier = trxId[1].getBytes(); + final byte[] globalTransactionId = trxId[0].getBytes(); + return new XATransactionID(DEFAULT_FORMAT, branchQualifier, globalTransactionId); } - static XATransactionID createXID() { - final byte[] branchQualifier = randomBytes(); - final byte[] globalTransactionId = randomBytes(); - return new XATransactionID(formatIdIdGenerator.incrementAndGet(), branchQualifier, globalTransactionId); + public static int getDefaultFormat() { + return DEFAULT_FORMAT; } private XIDGenerator() { diff --git a/bvm/ballerina-runtime/src/main/resources/MessagesBundle.properties b/bvm/ballerina-runtime/src/main/resources/MessagesBundle.properties index a7caa9feeb3d..14822f24a2d2 100644 --- a/bvm/ballerina-runtime/src/main/resources/MessagesBundle.properties +++ b/bvm/ballerina-runtime/src/main/resources/MessagesBundle.properties @@ -276,3 +276,14 @@ config.env.variable.name.ambiguity = configurable environment variable ''{0}'' c no.worker.message.received = no message received from worker ''{0}'' to worker ''{1}'' invalid.tuple.member.size = the number of members in a tuple value should be greater than the member types of the tuple \ to perform a ''{0}'' operation + +#transactions +transaction.invalid.checkpoint.value = invalid value provided for checkpoint interval. using default value ''{0}'' +transaction.in.heuristic.state = transaction ''{0}'' in ''{1}'' state in participant/resource(s) when decision was to ''{2}'' +transaction.in.hazard.state = transaction ''{0}'' in hazard state. check your data for consistency. +transaction.in.mixed.state = transaction ''{0}'' has mixed outcomes. check your data for consistency. +transaction.startup.recovery.failed = transaction recovery on startup failed. will try again on next request. \ + if you keep seeing this, clean the logs manually and restart the server. +transaction.cannot.create.log.file = cannot create transaction log file ''{0}'' +transaction.cannot.collect.xids.in.resource = could not retrieve prepared xids from resource ''{0}'' +transaction.cannot.parse.log.record = cannot parse transaction log record from line ''{0}'' diff --git a/bvm/ballerina-runtime/src/test/java/io/ballerina/runtime/test/transactions/LogManagerTests.java b/bvm/ballerina-runtime/src/test/java/io/ballerina/runtime/test/transactions/LogManagerTests.java new file mode 100644 index 000000000000..0245e68ab45a --- /dev/null +++ b/bvm/ballerina-runtime/src/test/java/io/ballerina/runtime/test/transactions/LogManagerTests.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2024, WSO2 Inc. (http://www.wso2.org). + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.runtime.test.transactions; + +import io.ballerina.runtime.transactions.FileRecoveryLog; +import io.ballerina.runtime.transactions.InMemoryRecoveryLog; +import io.ballerina.runtime.transactions.LogManager; +import io.ballerina.runtime.transactions.RecoveryState; +import io.ballerina.runtime.transactions.TransactionLogRecord; +import org.testng.Assert; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; + +public class LogManagerTests { + + private LogManager logManager; + private TransactionLogRecord logRecord; + private Path recoveryLogDir = Path.of("build/tmp/test/recovery/logManagerTestLogs"); + + @BeforeSuite + public void setup() { + logManager = LogManager.getInstance("testLog", -1, recoveryLogDir, false); + logRecord = new TransactionLogRecord("00000000-0000-0000-0000-000000000000", "0", + RecoveryState.PREPARING); + } + + @Test(description = "Test adding logs records to both in-memory and file recovery logs") + public void testPut() throws Exception { + logManager.put(logRecord); + Field inMemoryRecoveryLogField = LogManager.class.getDeclaredField("inMemoryRecoveryLog"); + inMemoryRecoveryLogField.setAccessible(true); + InMemoryRecoveryLog inMemoryRecoveryLog = (InMemoryRecoveryLog) inMemoryRecoveryLogField.get(logManager); + Map inMemoryLogs = inMemoryRecoveryLog.getAllLogs(); + Field fileRecoveryLogField = LogManager.class.getDeclaredField("fileRecoveryLog"); + fileRecoveryLogField.setAccessible(true); + FileRecoveryLog fileRecoveryLog = (FileRecoveryLog) fileRecoveryLogField.get(logManager); + Map fileLogs = fileRecoveryLog.getPendingLogs(); + + Assert.assertTrue(inMemoryLogs.containsKey(logRecord.getCombinedId())); + Assert.assertTrue(fileLogs.containsKey(logRecord.getCombinedId())); + } + + @Test(description = "Test getting failed transaction logs from the log manager") + public void testGetFailedTransactionLogs() { + logManager.put(logRecord); + Map failedTransactions = logManager.getFailedTransactionLogs(); + Assert.assertTrue(failedTransactions.containsValue(logRecord)); + } + + @AfterSuite + public void tearDown() throws IOException { + logManager.close(); + if (Files.exists(recoveryLogDir)) { + Files.walk(recoveryLogDir).sorted((o1, o2) -> -o1.compareTo(o2)).forEach(path -> { + try { + Files.delete(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + } +} diff --git a/bvm/ballerina-runtime/src/test/java/io/ballerina/runtime/test/transactions/RecoveryManagerTests.java b/bvm/ballerina-runtime/src/test/java/io/ballerina/runtime/test/transactions/RecoveryManagerTests.java new file mode 100644 index 000000000000..99583130ba8e --- /dev/null +++ b/bvm/ballerina-runtime/src/test/java/io/ballerina/runtime/test/transactions/RecoveryManagerTests.java @@ -0,0 +1,358 @@ +/* + * Copyright (c) 2024, WSO2 Inc. (http://www.wso2.org). + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.runtime.test.transactions; + +import io.ballerina.runtime.internal.diagnostics.RuntimeDiagnostic; +import io.ballerina.runtime.internal.diagnostics.RuntimeDiagnosticLog; +import io.ballerina.runtime.internal.errors.ErrorCodes; +import io.ballerina.runtime.transactions.LogManager; +import io.ballerina.runtime.transactions.RecoveryManager; +import io.ballerina.runtime.transactions.RecoveryState; +import io.ballerina.runtime.transactions.TransactionLogRecord; +import io.ballerina.runtime.transactions.TransactionResourceManager; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +public class RecoveryManagerTests { + private RecoveryManager recoveryManager; + private XAResource xaResource; + private Map transactionLogRecords; + + @BeforeMethod + public void setup() throws NoSuchFieldException, IllegalAccessException { + TransactionResourceManager transactionResourceManager = TransactionResourceManager.getInstance(); + recoveryManager = new RecoveryManager(); + Field logManagerField = TransactionResourceManager.class.getDeclaredField("logManager"); + logManagerField.setAccessible(true); + LogManager logManager = LogManager.getInstance("testLog", -1, + Path.of("build/tmp/test/recovery/testRecoveryLogs"), true); + logManagerField.set(transactionResourceManager, logManager); + + Field field = TransactionResourceManager.class.getDeclaredField("recoveryManager"); + field.setAccessible(true); + field.set(transactionResourceManager, recoveryManager); + xaResource = Mockito.mock(XAResource.class); + transactionLogRecords = new HashMap<>(); + TransactionLogRecord logRecord = Mockito.mock(TransactionLogRecord.class); + transactionLogRecords.put("testTransaction", logRecord); + } + + @Test (description = "Test adding XA resource to be recovered to the recovery manager") + public void testAddXAResourceToRecover() throws NoSuchFieldException, IllegalAccessException { + recoveryManager.addXAResourceToRecover(xaResource); + Field field = RecoveryManager.class.getDeclaredField("xaResources"); + field.setAccessible(true); + Collection xaResources = (Collection) field.get(recoveryManager); + Assert.assertTrue(xaResources.contains(xaResource)); + } + + @Test (description = "Test adding failed transaction records to the recovery manager") + public void testRetrievingTransactionsToRecover() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Map transactionLogRecords = new HashMap<>(); + TransactionLogRecord logRecord = Mockito.mock(TransactionLogRecord.class); + transactionLogRecords.put("testTransaction", logRecord); + Method method = RecoveryManager.class.getDeclaredMethod("putFailedTransactionRecords", Map.class); + method.setAccessible(true); + method.invoke(recoveryManager, transactionLogRecords); + + Map pendingRecords = recoveryManager.getPendingTransactionRecords(); + Assert.assertEquals(pendingRecords, transactionLogRecords); + } + + @Test (description = "Test recovery pass when there are no failed transactions") + public void testRecoveryPassWithNoFailedTransactions() throws NoSuchFieldException, IllegalAccessException { + recoveryManager.addXAResourceToRecover(xaResource); + Map failedTransactions = new HashMap<>(); + Field failedTransactionsField = RecoveryManager.class.getDeclaredField("failedTransactions"); + failedTransactionsField.setAccessible(true); + failedTransactionsField.set(recoveryManager, failedTransactions); + + Assert.assertTrue(recoveryManager.performRecoveryPass()); + } + + @Test (description = "Test startup recovery pass recovering from all recoverable states") + public void testRecoverFromRecoverableStates() throws NoSuchFieldException, IllegalAccessException { + recoveryManager.addXAResourceToRecover(xaResource); + + LogManager logManager = Mockito.mock(LogManager.class); + Map failedTransactions = new HashMap<>(); + List states = Arrays.asList(RecoveryState.PREPARING, RecoveryState.COMMITTING, + RecoveryState.ABORTING, RecoveryState.COMMITTED, RecoveryState.ABORTED); + for (RecoveryState state : states) { + String trxId = "trxId" + state.name(); + String trxBlockId = "trxBlockId" + state.name(); + TransactionLogRecord record = Mockito.mock(TransactionLogRecord.class); + Mockito.when(record.getTransactionId()).thenReturn(trxId); + Mockito.when(record.getTransactionBlockId()).thenReturn(trxBlockId); + Mockito.when(record.getTransactionState()).thenReturn(state); + Mockito.when(record.getCombinedId()).thenReturn(trxId + ":" + trxBlockId); + failedTransactions.put(record.getCombinedId(), record); + } + Mockito.when(logManager.getFailedTransactionLogs()).thenReturn(failedTransactions); + Field failedTransactionsField = RecoveryManager.class.getDeclaredField("failedTransactions"); + failedTransactionsField.setAccessible(true); + failedTransactionsField.set(recoveryManager, failedTransactions); + + recoveryManager.performRecoveryPass(); // perform recovery pass + + Map failedTransactionAfterRecovery = + (Map) failedTransactionsField.get(recoveryManager); + Assert.assertTrue(failedTransactionAfterRecovery.isEmpty()); // means recovery was successful + } + + @Test (description = "Test startup recovery pass recovering from hazard and mixed states") + public void testRecoveryForHazardMixedStates() throws NoSuchFieldException, IllegalAccessException { + recoveryManager.addXAResourceToRecover(xaResource); + + LogManager logManager = Mockito.mock(LogManager.class); + Map failedTransactions = new HashMap<>(); + + List states = Arrays.asList(RecoveryState.HAZARD, RecoveryState.MIXED); + + for (RecoveryState state : states) { + String trxId = "trxId" + state.name(); + String trxBlockId = "trxBlockId" + state.name(); + TransactionLogRecord record = Mockito.mock(TransactionLogRecord.class); + Mockito.when(record.getTransactionId()).thenReturn(trxId); + Mockito.when(record.getTransactionBlockId()).thenReturn(trxBlockId); + Mockito.when(record.getTransactionState()).thenReturn(state); + Mockito.when(record.getCombinedId()).thenReturn(trxId + ":" + trxBlockId); + failedTransactions.put(record.getCombinedId(), record); + } + + Mockito.when(logManager.getFailedTransactionLogs()).thenReturn(failedTransactions); + Field failedTransactionsField = RecoveryManager.class.getDeclaredField("failedTransactions"); + failedTransactionsField.setAccessible(true); + failedTransactionsField.set(recoveryManager, failedTransactions); + + Field diagnosticLogsField = RecoveryManager.class.getDeclaredField("diagnosticLog"); + diagnosticLogsField.setAccessible(true); + + recoveryManager.performRecoveryPass(); + + Map failedTransactionAfterRecovery = + (Map) failedTransactionsField.get(recoveryManager); + RuntimeDiagnosticLog diagnosticLogs = (RuntimeDiagnosticLog) diagnosticLogsField.get(recoveryManager); + List diagnosticList = diagnosticLogs.getDiagnosticList(); + + Assert.assertEquals(diagnosticList.size(), 2); + Assert.assertTrue(failedTransactionAfterRecovery.isEmpty()); // means user was warned and recovery successful + } + + @Test (description = "Test handling recovery when decision was commit and resource has heur committed") + public void testHeuristicTerminationWhenDecisionCommitWithHeurCommit() + throws IllegalAccessException, NoSuchMethodException, + InvocationTargetException { + Xid mockXid = Mockito.mock(Xid.class); + XAResource mockXaResource = Mockito.mock(XAResource.class); + XAException mockXaException = Mockito.mock(XAException.class); + + mockXaException.errorCode = XAException.XA_HEURCOM; + + Method handleHeuristicTerminationMethod = RecoveryManager.class.getDeclaredMethod( + "handleHeuristicTermination", Xid.class, XAResource.class, XAException.class, + boolean.class); + handleHeuristicTerminationMethod.setAccessible(true); + + // Invoke the method with a decisionCommit value of true + boolean result = (boolean) handleHeuristicTerminationMethod.invoke(recoveryManager, mockXid, + mockXaResource, mockXaException, true); + Assert.assertTrue(result); + } + + @Test (description = "Test handling recovery when decision was commit and resource has heur rolledback") + public void testHeuristicTerminationWhenDecisionCommitWithHeurRollback() + throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, + InvocationTargetException { + + Xid mockXid = Mockito.mock(Xid.class); + Mockito.when(mockXid.getGlobalTransactionId()).thenReturn(new byte[0]); + Mockito.when(mockXid.getBranchQualifier()).thenReturn(new byte[0]); + XAResource mockXaResource = Mockito.mock(XAResource.class); + XAException mockXaException = Mockito.mock(XAException.class); + + mockXaException.errorCode = XAException.XA_HEURRB; + + Method handleHeuristicTerminationMethod = RecoveryManager.class.getDeclaredMethod( + "handleHeuristicTermination", Xid.class, XAResource.class, XAException.class, boolean.class); + handleHeuristicTerminationMethod.setAccessible(true); + + // Invoke with a decisionCommit value of true + boolean result = (boolean) handleHeuristicTerminationMethod.invoke( + recoveryManager, mockXid, mockXaResource, mockXaException, true); + + Field diagnosticLogsField = RecoveryManager.class.getDeclaredField("diagnosticLog"); + diagnosticLogsField.setAccessible(true); + RuntimeDiagnosticLog diagnosticLogs = (RuntimeDiagnosticLog) diagnosticLogsField.get(recoveryManager); + List diagnosticList = diagnosticLogs.getDiagnosticList(); + + Assert.assertTrue(result); + Assert.assertEquals(diagnosticList.size(), 1); + Assert.assertEquals(diagnosticList.get(0).diagnosticInfo().code(), + ErrorCodes.TRANSACTION_IN_HUERISTIC_STATE.diagnosticId()); + } + + @Test (description = "Test handling recovery when decision was rollback and resource has heur rolledback") + public void testHeuristicTerminationWhenDecisionRollbackWithHeurRollback() + throws IllegalAccessException, NoSuchMethodException, + InvocationTargetException { + Xid mockXid = Mockito.mock(Xid.class); + XAResource mockXaResource = Mockito.mock(XAResource.class); + XAException mockXaException = Mockito.mock(XAException.class); + + mockXaException.errorCode = XAException.XA_HEURRB; + + Method handleHeuristicTerminationMethod = RecoveryManager.class.getDeclaredMethod( + "handleHeuristicTermination", Xid.class, XAResource.class, XAException.class, boolean.class); + handleHeuristicTerminationMethod.setAccessible(true); + + // Invoke the method with a decisionCommit value of false + boolean result = (boolean) handleHeuristicTerminationMethod.invoke(recoveryManager, mockXid, + mockXaResource, mockXaException, false); + Assert.assertTrue(result); + } + + @Test (description = "Test handling recovery when decision was rollback and resource has heur committed") + public void testHeuristicTerminationWhenDecisionRollbackWithHeurCommit() + throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, + InvocationTargetException { + + Xid mockXid = Mockito.mock(Xid.class); + Mockito.when(mockXid.getGlobalTransactionId()).thenReturn(new byte[0]); + Mockito.when(mockXid.getBranchQualifier()).thenReturn(new byte[0]); + XAResource mockXaResource = Mockito.mock(XAResource.class); + XAException mockXaException = Mockito.mock(XAException.class); + + mockXaException.errorCode = XAException.XA_HEURCOM; + + Method handleHeuristicTerminationMethod = RecoveryManager.class.getDeclaredMethod( + "handleHeuristicTermination", Xid.class, XAResource.class, XAException.class, boolean.class); + handleHeuristicTerminationMethod.setAccessible(true); + + // Invoke with a decisionCommit value of false + boolean result = (boolean) handleHeuristicTerminationMethod.invoke(recoveryManager, mockXid, + mockXaResource, mockXaException, false); + + Field diagnosticLogsField = RecoveryManager.class.getDeclaredField("diagnosticLog"); + diagnosticLogsField.setAccessible(true); + RuntimeDiagnosticLog diagnosticLogs = (RuntimeDiagnosticLog) diagnosticLogsField.get(recoveryManager); + List diagnosticList = diagnosticLogs.getDiagnosticList(); + + Assert.assertTrue(result); + Assert.assertEquals(diagnosticList.size(), 1); + Assert.assertEquals(diagnosticList.get(0).diagnosticInfo().code(), + ErrorCodes.TRANSACTION_IN_HUERISTIC_STATE.diagnosticId()); + } + + @Test (description = "Test handling recovery when a resource is in mixed state") + public void testHeuristicTerminationWithHuerMixed() + throws NoSuchFieldException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Xid mockXid = Mockito.mock(Xid.class); + Mockito.when(mockXid.getGlobalTransactionId()).thenReturn(new byte[0]); + Mockito.when(mockXid.getBranchQualifier()).thenReturn(new byte[0]); + XAResource mockXaResource = Mockito.mock(XAResource.class); + XAException mockXaException = Mockito.mock(XAException.class); + + mockXaException.errorCode = XAException.XA_HEURMIX; + + Method handleHeuristicTerminationMethod = RecoveryManager.class.getDeclaredMethod( + "handleHeuristicTermination", Xid.class, XAResource.class, XAException.class, boolean.class); + handleHeuristicTerminationMethod.setAccessible(true); + + boolean result = (boolean) handleHeuristicTerminationMethod.invoke(recoveryManager, mockXid, + mockXaResource, mockXaException, true); + + Field diagnosticLogsField = RecoveryManager.class.getDeclaredField("diagnosticLog"); + diagnosticLogsField.setAccessible(true); + RuntimeDiagnosticLog diagnosticLogs = (RuntimeDiagnosticLog) diagnosticLogsField.get(recoveryManager); + List diagnosticList = diagnosticLogs.getDiagnosticList(); + + Assert.assertTrue(result); + Assert.assertEquals(diagnosticList.size(), 1); + Assert.assertEquals(diagnosticList.get(0).diagnosticInfo().code(), + ErrorCodes.TRANSACTION_IN_HUERISTIC_STATE.diagnosticId()); + } + + @Test (description = "Test handling recovery when a resource is in heur hazard") + public void testHeuristicTerminationWithHuerHazard() + throws NoSuchFieldException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Xid mockXid = Mockito.mock(Xid.class); + Mockito.when(mockXid.getGlobalTransactionId()).thenReturn(new byte[0]); + Mockito.when(mockXid.getBranchQualifier()).thenReturn(new byte[0]); + XAResource mockXaResource = Mockito.mock(XAResource.class); + XAException mockXaException = Mockito.mock(XAException.class); + + mockXaException.errorCode = XAException.XA_HEURHAZ; + + Method handleHeuristicTerminationMethod = RecoveryManager.class.getDeclaredMethod( + "handleHeuristicTermination", Xid.class, XAResource.class, XAException.class, boolean.class); + handleHeuristicTerminationMethod.setAccessible(true); + + boolean result = (boolean) handleHeuristicTerminationMethod.invoke(recoveryManager, mockXid, + mockXaResource, mockXaException, true); + + Field diagnosticLogsField = RecoveryManager.class.getDeclaredField("diagnosticLog"); + diagnosticLogsField.setAccessible(true); + RuntimeDiagnosticLog diagnosticLogs = (RuntimeDiagnosticLog) diagnosticLogsField.get(recoveryManager); + List diagnosticList = diagnosticLogs.getDiagnosticList(); + + Assert.assertFalse(result); + Assert.assertEquals(diagnosticList.size(), 1); + Assert.assertEquals(diagnosticList.get(0).diagnosticInfo().code(), + ErrorCodes.TRANSACTION_IN_HUERISTIC_STATE.diagnosticId()); + } + + @Test (description = "Test unable to create file") + + + @AfterTest + public void tearDown() throws IOException { + Path dir = Paths.get("build/tmp/test/recovery/testRecoveryLogs"); + if (Files.exists(dir)) { + Files.walk(dir).sorted((o1, o2) -> -o1.compareTo(o2)).forEach(path -> { + try { + Files.delete(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + } +} diff --git a/bvm/ballerina-runtime/src/test/java/io/ballerina/runtime/test/transactions/XIDGeneratorTest.java b/bvm/ballerina-runtime/src/test/java/io/ballerina/runtime/test/transactions/XIDGeneratorTest.java new file mode 100644 index 000000000000..ad933ecf0d72 --- /dev/null +++ b/bvm/ballerina-runtime/src/test/java/io/ballerina/runtime/test/transactions/XIDGeneratorTest.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024, WSO2 Inc. (http://www.wso2.org). + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.runtime.test.transactions; + +import io.ballerina.runtime.transactions.XIDGenerator; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import javax.transaction.xa.Xid; + +public class XIDGeneratorTest { + + @Test (description = "Test createXID method in XIDGenerator class") + public void testCreateXID() + throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { + String combinedId = "00000000-0000-0000-0000-000000000000:0"; + Class xidGeneratorClass = Class.forName("io.ballerina.runtime.transactions.XIDGenerator"); + Method createXIDMethod = xidGeneratorClass.getDeclaredMethod("createXID", String.class); + createXIDMethod.setAccessible(true); + Xid xid = (Xid) createXIDMethod.invoke(null, combinedId); + Assert.assertNotNull(xid); + Assert.assertEquals(xid.getFormatId(), XIDGenerator.getDefaultFormat()); + Assert.assertEquals(new String(xid.getGlobalTransactionId()), "00000000-0000-0000-0000-000000000000"); + Assert.assertEquals(new String(xid.getBranchQualifier()), "0"); + } +} diff --git a/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/desugar/TransactionDesugar.java b/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/desugar/TransactionDesugar.java index d81adba0d815..51ab7464a2c4 100644 --- a/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/desugar/TransactionDesugar.java +++ b/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/desugar/TransactionDesugar.java @@ -81,6 +81,7 @@ import static org.wso2.ballerinalang.compiler.util.Names.START_TRANSACTION; import static org.wso2.ballerinalang.compiler.util.Names.START_TRANSACTION_COORDINATOR; import static org.wso2.ballerinalang.compiler.util.Names.TRANSACTION_INFO_RECORD; +import static org.wso2.ballerinalang.compiler.util.Names.STARTUP_CRASH_RECOVERY; /** * Class responsible for desugar transaction statements into actual Ballerina code. @@ -91,6 +92,7 @@ public class TransactionDesugar extends BLangNodeVisitor { private static final CompilerContext.Key TRANSACTION_DESUGAR_KEY = new CompilerContext.Key<>(); private static final String SHOULD_CLEANUP_SYMBOL = "$shouldCleanUp$"; + public static final String STARTUP_RECOVERY_SYMBOL = "$startupRecovery$"; private final Desugar desugar; private final SymbolTable symTable; private final SymbolResolver symResolver; @@ -162,7 +164,7 @@ public void visit(BLangTransaction transactionNode) { private BLangBlockStmt desugarTransactionBody(BLangTransaction transactionNode, SymbolEnv env, Location pos) { BLangBlockStmt transactionBlockStmt = ASTBuilderUtil.createBlockStmt(pos); transactionBlockStmt.scope = transactionNode.transactionBody.scope; - + transactionBlockStmt.stmts.add(createVarDefForStartupRecovery(env, pos)); //boolean $shouldCleanUp$ = false; BVarSymbol shouldCleanUpSymbol = new BVarSymbol(0, new Name(SHOULD_CLEANUP_SYMBOL + UNDERSCORE + uniqueId), env.scope.owner.pkgID, symTable.booleanType, env.scope.owner, pos, VIRTUAL); @@ -364,6 +366,16 @@ private BLangSimpleVariableDef createVarDefForCoordinator(SymbolEnv env, Locatio return ASTBuilderUtil.createVariableDef(pos, outputVariable); } + private BLangSimpleVariableDef createVarDefForStartupRecovery(SymbolEnv env, Location pos) { + BLangExpression invocation = createStartupCrashRecoveryStmt(pos); + BVarSymbol outputVarSymbol = new BVarSymbol(0, new Name(STARTUP_RECOVERY_SYMBOL), + env.scope.owner.pkgID, symTable.errorOrNilType, env.scope.owner, pos, VIRTUAL); + BLangSimpleVariable outputVariable = + ASTBuilderUtil.createVariable(pos, STARTUP_RECOVERY_SYMBOL, symTable.errorOrNilType, + invocation, outputVarSymbol); + return ASTBuilderUtil.createVariableDef(pos, outputVariable); + } + public void startTransactionCoordinatorOnce(SymbolEnv env, Location pos) { if (!trxCoordinatorServiceStarted) { BLangBlockFunctionBody funcBody = (BLangBlockFunctionBody) env.enclPkg.initFunction.body; @@ -484,6 +496,15 @@ private BLangInvocation createCleanupTrxStmt(Location pos, BLangLiteral trxBlock return cleanupTrxInvocation; } + public BLangInvocation createStartupCrashRecoveryStmt(Location pos) { + List args = new ArrayList<>(); + BInvokableSymbol startupCrashRecoveryInvokableSymbol = + (BInvokableSymbol) getInternalTransactionModuleInvokableSymbol(STARTUP_CRASH_RECOVERY); + BLangInvocation startupCrashRecoveryInvocation = ASTBuilderUtil. + createInvocationExprForMethod(pos, startupCrashRecoveryInvokableSymbol, args, symResolver); + return startupCrashRecoveryInvocation; + } + BLangStatementExpression invokeRollbackFunc(Location pos, BLangExpression rollbackExpr, BLangLiteral trxBlockId, BLangSimpleVarRef shouldRetryRef) { // Rollback desugar implementation diff --git a/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/util/Names.java b/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/util/Names.java index a57fdf0b7d68..b3a2c3dead9b 100644 --- a/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/util/Names.java +++ b/compiler/ballerina-lang/src/main/java/org/wso2/ballerinalang/compiler/util/Names.java @@ -142,6 +142,7 @@ public class Names { public static final Name CLEAN_UP_TRANSACTION = new Name("cleanupTransactionContext"); public static final Name BEGIN_REMOTE_PARTICIPANT = new Name("beginRemoteParticipant"); public static final Name START_TRANSACTION_COORDINATOR = new Name("startTransactionCoordinator"); + public static final Name STARTUP_CRASH_RECOVERY = new Name("startupCrashRecovery"); // Names related to streams public static final Name CONSTRUCT_STREAM = new Name("construct"); diff --git a/langlib/lang.transaction/src/main/ballerina/transaction.bal b/langlib/lang.transaction/src/main/ballerina/transaction.bal index ab62be24e215..39ec51fcad7e 100644 --- a/langlib/lang.transaction/src/main/ballerina/transaction.bal +++ b/langlib/lang.transaction/src/main/ballerina/transaction.bal @@ -20,6 +20,14 @@ import ballerina/jballerina.java; configurable boolean managerEnabled = false; # Config to specify transaction log directory. configurable string logBase = "transaction_log_dir"; +# Config to recovery log directory. +configurable string recoveryLogName = "recoveryLog"; +# Config to recovery log directory. +configurable string recoveryLogDir = ""; +# Config to specify the checkpoint interval. +configurable int checkpointInterval = 10000; +# Config to specify whether to delete logs or not +configurable boolean deleteOldLogs = true; # Config to specify the timeout for auto commit. configurable int transactionAutoCommitTimeout = 120; # Config to specify the timeout for cleaning up dead transactions.