Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Transaction Recovery for XA Resources #42080

Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
3106e32
Implement recovery log file system
dsplayerX Oct 25, 2023
630b4c5
Cleanup unused imports
dsplayerX Oct 25, 2023
9a56cdf
Add initialization of fileRecoveryLog to TransactionResourceManager
dsplayerX Oct 25, 2023
1cb2ff8
Add TransactionRecord to store a log record
dsplayerX Oct 27, 2023
d295306
Implement in memory log and rewrite some functions
dsplayerX Oct 27, 2023
a540ae5
Add enum for recovery statuses instead of string
dsplayerX Oct 30, 2023
f8a2b3f
Add recoveryLogName config to transaction.bal
dsplayerX Oct 30, 2023
4b6dfdb
Fix null pointer and concurrent modification exceptions with existing…
dsplayerX Oct 31, 2023
bb0df43
Updated comments
dsplayerX Oct 31, 2023
228fb0b
Add hazard state to RecoveryStatus
dsplayerX Nov 1, 2023
8f8418d
Implement checkpointing for file and in memory logs
dsplayerX Nov 1, 2023
c11d3e4
Change default checkpoint interval
dsplayerX Nov 3, 2023
a51ee30
Move parseTransactionLogRecord to TransactionLogRecord class
dsplayerX Nov 3, 2023
38b0442
Add configs for log directory and deletion of old logs
dsplayerX Nov 3, 2023
36f1c79
Add comments and cleanup code
dsplayerX Nov 3, 2023
6f78138
Add force/lazy writing to logs
dsplayerX Nov 30, 2023
c49e433
Update recovery statuses
dsplayerX Nov 30, 2023
774c527
Update Xid to gtrid and unique formatId
dsplayerX Dec 5, 2023
af0dbe4
Add a placeholder bqual
dsplayerX Dec 5, 2023
86a1e35
Implement recovery for xaResources
dsplayerX Dec 5, 2023
0d2fc9f
Refactor RecoveryStatus to RecoveryState
dsplayerX Dec 12, 2023
dab23b5
Update TransactionLogRecord to include log time
dsplayerX Dec 12, 2023
86ae597
Update createXID to use trxId and blockId
dsplayerX Dec 12, 2023
30478f8
Update hazard state log record
dsplayerX Dec 14, 2023
d8bcba2
Handle exceptions in xaResourceRecovery
dsplayerX Dec 14, 2023
152d2ab
Minor changes to logs
dsplayerX Jan 29, 2024
83fe1e2
Implement the log manager
dsplayerX Jan 29, 2024
24b1c3f
Add getDefaultFormat in XIDGenerator
dsplayerX Jan 29, 2024
81a8c92
Write terminated logs during transaction clean up
dsplayerX Jan 30, 2024
23b10e6
Update getCombinedId
dsplayerX Jan 30, 2024
330cbb2
Remove placeholder branch qualifier const
dsplayerX Jan 30, 2024
c797299
Refactor method name in LogManager
dsplayerX Jan 30, 2024
f9269fa
Changes to the recovery logic
dsplayerX Jan 30, 2024
db98c84
Add startupCrashRecover to transaction-block in desugar
dsplayerX Jan 30, 2024
bf7d6e2
Remove unused method
dsplayerX Jan 30, 2024
ea89fbb
Handle transaction recovery errors and warnings
dsplayerX Jan 30, 2024
0524380
Handle prepared transactions without a decision log record
dsplayerX Jan 31, 2024
8bcafe5
Add licence headers
dsplayerX Feb 1, 2024
4fd4179
Update switch cases in RecoveryManager
dsplayerX Feb 1, 2024
5c63a68
Fix formatting
dsplayerX Feb 1, 2024
d629dbd
Remove unnecessary conditions and variables
dsplayerX Feb 1, 2024
6d2287d
Move inline values to dedicated constants.
dsplayerX Feb 2, 2024
a33fb3c
Remove redundant checks in FileRecoveryLog
dsplayerX Feb 2, 2024
9758540
Improve errors in FileRecoveryLog
dsplayerX Feb 2, 2024
b814905
Address review suggestions
dsplayerX Feb 7, 2024
144b3d0
Replace switch cases with enhanced switch cases
dsplayerX Feb 13, 2024
87a031b
Fix import formatting
dsplayerX Feb 28, 2024
c2af735
Merge remote-tracking branch 'upstream/master' into feature-transacti…
dsplayerX Mar 4, 2024
3019503
Add doc comments
dsplayerX Mar 8, 2024
07084e3
Rework recovery logic
dsplayerX Mar 8, 2024
0d798fd
Cleanup code
dsplayerX Mar 11, 2024
73be34d
Handle invalid filenames in log directory
dsplayerX Mar 11, 2024
c07cdaa
Remove trailing whitespaces when parsing logs
dsplayerX Mar 11, 2024
eb0775a
Add cannot parse log record runtime error
dsplayerX Mar 11, 2024
9807001
Make startup recovery symbol a constant
dsplayerX Mar 12, 2024
f303138
Add method to write multiple logs to file
dsplayerX Mar 13, 2024
6c02229
Remove redundant file name check in findLatestVersion
dsplayerX Mar 13, 2024
5ae4915
Add unit tests for recovery and log managers.
dsplayerX Mar 13, 2024
ad065ed
Add unit test for xid creation
dsplayerX Mar 13, 2024
4b73d66
Rename checkpoint interval key variable
dsplayerX Mar 14, 2024
ab61d60
Add method comments and fix typos
dsplayerX Mar 21, 2024
0d38a71
Add unit tests for heuristic termination
dsplayerX Mar 21, 2024
be64fab
Make initAppendChannel thread safe
dsplayerX Apr 1, 2024
878cc73
Refactor createXid method
dsplayerX Apr 18, 2024
db1d550
Resolve merge conflicts
dsplayerX Apr 18, 2024
d6a6919
Make getFailedTransaction method concurrent safe
dsplayerX Apr 22, 2024
8232e93
Refactor handleHeuristicTermination method
dsplayerX Apr 22, 2024
927c6b5
Refactor handling recovery in resources
dsplayerX May 9, 2024
11ac5e8
Introduce FileLockAndChannel to handle append channel and locking
dsplayerX May 13, 2024
45e30eb
Rework putAll method
dsplayerX May 27, 2024
393d187
Refactor transaction log managers to use singleton pattern
dsplayerX Jun 6, 2024
5c63125
Resolve merge conflicts
gimantha Oct 9, 2024
d202e3b
Merge branch 'master' of https://github.com/ballerina-platform/baller…
gimantha Oct 9, 2024
224eea5
Merge pull request #2 from gimantha/dsplayerX-feature-transaction-rec…
dsplayerX Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,17 @@ public enum ErrorCodes implements DiagnosticCode {
REGEXP_INVALID_HEX_DIGIT("regexp.invalid.hex.digit", "RUNTIME_0120"),
CONFIG_TOML_INVALID_MODULE_STRUCTURE_WITH_VARIABLE("config.toml.invalid.module.structure.with.variable",
"RUNTIME_0121"),
EMPTY_XML_SEQUENCE_HAS_NO_ATTRIBUTES("empty.xml.sequence.no.attributes", "RUNTIME_0122");
EMPTY_XML_SEQUENCE_HAS_NO_ATTRIBUTES("empty.xml.sequence.no.attributes", "RUNTIME_0122"),

// transaction recovery errors
TRANSACTION_INVALID_CHECKPOINT_VALUE("transaction.invalid.checkpoint.value", "RUNTIME_0123"),
TRANSACTION_IN_HUERISTIC_STATE("transaction.in.heuristic.state", "RUNTIME_0124"),
TRANSACTION_IN_HAZARD_STATE("transaction.in.hazard.state", "RUNTIME_0125"),
TRANSACTION_IN_MIXED_STATE("transaction.in.mixed.state", "RUNTIME_0126"),
TRANSACTION_STARTUP_RECOVERY_FAILED("transaction.startup.recovery.failed", "RUNTIME_0127"),
TRANSACTION_CANNOT_CREATE_LOG_FILE("transaction.cannot.create.log.file", "RUNTIME_0128"),
TRANSACTION_CANNOT_COLLECT_XIDS_IN_RESOURCE("transaction.cannot.collect.xids.in.resource", "RUNTIME_0129");


private final String errorMsgKey;
private final String errorCode;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
* Copyright (c) 2024, WSO2 Inc. (http://www.wso2.org).
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.ballerina.runtime.transactions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

public class FileRecoveryLog implements RecoveryLog {

private static final Logger log = LoggerFactory.getLogger(FileRecoveryLog.class);
private String baseFileName;
private Path recoveryLogDir;
private int checkpointInterval;
private final boolean deleteOldLogs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commonly followed convention for naming boolean variables is to use a prefix like 'has', 'is', 'can' etc. In this case you can use something similar to isOldLogsDeleted.
Check other places as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleteOldLogs in the sense whether the user has configured to delete the old logs or not. Is that okay?

private int numOfPutsSinceLastCheckpoint;
private File file;
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
private FileChannel appendChannel = null;
private Map<String, TransactionLogRecord> existingLogs;
private static final PrintStream stderr = System.err;

/**
* Initializes a new FileRecoveryLog instance with the given base file name.
*
* @param baseFileName The base name for the recovery log files.
* @param checkpointInterval The interval at which to write checkpoints to the log file.
* @param recoveryLogDir The directory to store the recovery log files in.
* @param deleteOldLogs Whether to delete old log files when creating a new one.
*/
public FileRecoveryLog(String baseFileName, int checkpointInterval, Path recoveryLogDir, boolean deleteOldLogs) {
this.baseFileName = baseFileName;
this.recoveryLogDir = recoveryLogDir;
this.deleteOldLogs = deleteOldLogs;
this.checkpointInterval = checkpointInterval;
this.existingLogs = new HashMap<>();
this.file = createNextVersion();
this.numOfPutsSinceLastCheckpoint = 0;
}

/**
* Creates the next version of the recovery log file, cleaning up the previous one.
*
* @return The newly created log file.
*/
private File createNextVersion() {
int latestVersion = findLatestVersion();
File oldFile = recoveryLogDir.resolve(baseFileName + latestVersion + ".log").toFile();
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
if (oldFile.exists()) {
existingLogs = readLogsFromFile(oldFile);
if (deleteOldLogs) {
File[] files = recoveryLogDir.toFile().listFiles(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We iterate through the directory multiple times right. In the findLatestVersion() too. Can't we have the a localVariable of log files in the directory and use it (passing to functions) rather than iterating multiple times.

(dir, name) -> name.matches(baseFileName + "(\\d+)\\.log")
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
);
for (File file : files) {
file.delete();
}
}
}
File newFile = recoveryLogDir.resolve(baseFileName + (latestVersion + 1) + ".log").toFile();
try {
Files.createDirectories(recoveryLogDir); // create directory if not exists
newFile.createNewFile();
initAppendChannel(newFile);
if (existingLogs == null) {
return newFile;
}
// write existing unfinished logs to the new file
if (!existingLogs.isEmpty()) {
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
cleanUpFinishedLogs();
if (!existingLogs.isEmpty()) {
for (Map.Entry<String, TransactionLogRecord> entry : existingLogs.entrySet()) {
put(entry.getValue());
}
existingLogs.clear();
}
}
} catch (IOException e) {
stderr.println("error: failed to create recovery log file in " + recoveryLogDir);
}
return newFile;
}

/**
* Finds the latest version of the recovery log file.
*
* @return The latest version of the recovery log file.
*/
private int findLatestVersion() {
int latestVersion = 0;
File directory = recoveryLogDir.toFile();
File[] files = directory.listFiles((dir, name) -> name.matches(baseFileName + "(\\d+)\\.log"));
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a check to see if the directory exists?

if (files == null) {
return latestVersion;
}
for (File file : files) {
String fileName = file.getName();
int version = Integer.parseInt(fileName.replaceAll(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets make sure that even a file name that doesn't adhere to the expected format will not cause any unexpected error here.

baseFileName, "").replaceAll(".log", "")
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
);
if (version > latestVersion) {
latestVersion = version;
}
}
return latestVersion;
}

/**
* Initializes the append channel for the given file.
*
* @param file The file to initialize the append channel for.
*/
private void initAppendChannel(File file) {
try {
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
appendChannel = FileChannel.open(file.toPath(), StandardOpenOption.APPEND);
FileLock lock = appendChannel.tryLock();
if (lock == null) {
stderr.println("error: failed to acquire lock on recovery log file " + file.toPath());
}
} catch (IOException e) {
stderr.println("error: failed to acquire lock on recovery log file " + file.toPath());
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
public void put(TransactionLogRecord trxRecord) {
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
boolean force = !(trxRecord.getTransactionState().equals(RecoveryState.TERMINATED)); // lazy write
writeToFile(trxRecord.getTransactionLogRecord(), force);
if (checkpointInterval != -1) {
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
ifNeedWriteCheckpoint();
numOfPutsSinceLastCheckpoint++;
}
}

public Map<String, TransactionLogRecord> getPendingLogs() {
Map<String, TransactionLogRecord> pendingTransactions = new HashMap<>();
Map<String, TransactionLogRecord> transactionLogs = readLogsFromFile(file);
if (transactionLogs == null) {
return null;
}
Copy link
Contributor

@gayaldassanayake gayaldassanayake Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's better to use Optional rather than returning null which might case NPEs. Look for other places you return and check for null as well. https://docs.oracle.com/javase/8/docs/api/java/util/Optional.html

for (Map.Entry<String, TransactionLogRecord> entry : transactionLogs.entrySet()) {
String trxId = entry.getKey();
TransactionLogRecord trxRecord = entry.getValue();
if (!trxRecord.isCompleted()) {
pendingTransactions.put(trxId, trxRecord);
}
}
return pendingTransactions;
}

/**
* Write a transaction log entry to the recovery log file.
*
* @param str the log entry to write
*/
private void writeToFile(String str, boolean force) {
if (appendChannel == null || !appendChannel.isOpen()) {
initAppendChannel(file);
}
byte[] bytes = str.getBytes();
try {
appendChannel.write(java.nio.ByteBuffer.wrap(bytes));
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
appendChannel.force(force);
} catch (IOException e) {
stderr.println("error: failed to write to recovery log file " + file.toPath());
}
}

private Map<String, TransactionLogRecord> readLogsFromFile(File file) {
Map<String, TransactionLogRecord> logMap = new HashMap<>();
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
if (!file.exists() || file.length() == 0) {
return null;
}
if (appendChannel != null) {
closeEverything();
}
try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
String line;
while ((line = reader.readLine()) != null) {
TransactionLogRecord transactionLogRecord = TransactionLogRecord.parseTransactionLogRecord(line);
if (transactionLogRecord != null) {
// TODO: add a check here to check whether the record exists in the logMap and new state is a valid
// state from the current state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the TODO if this is not valid

logMap.put(transactionLogRecord.transactionId, transactionLogRecord);
}
}
} catch (IOException e) {
stderr.println("error: failed to read the recovery log file " + file.toPath());
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
}
return logMap;
}

private void cleanUpFinishedLogs() {
if (existingLogs.isEmpty()) {
return;
}
Iterator<Map.Entry<String, TransactionLogRecord>> iterator = existingLogs.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, TransactionLogRecord> entry = iterator.next();
if (entry.getValue().isCompleted()) {
iterator.remove(); // Safely remove the entry
}
}
}

public void ifNeedWriteCheckpoint() {
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
if (numOfPutsSinceLastCheckpoint >= checkpointInterval) {
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
numOfPutsSinceLastCheckpoint = 0; // need to set here otherwise it will just keep creating new files
File newFile = createNextVersion();
file = newFile;
}
}

@Override
public void close() {
}

private void closeEverything() {
try {
appendChannel.close();
} catch (IOException e) {
// nothing to do. java cray cray. :)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2024, WSO2 Inc. (http://www.wso2.org).
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.ballerina.runtime.transactions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static io.ballerina.runtime.transactions.TransactionConstants.IN_MEMORY_CHECKPOINT_INTERVAL;

public class InMemoryRecoveryLog implements RecoveryLog {
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger log = LoggerFactory.getLogger(InMemoryRecoveryLog.class);
private Map<String, TransactionLogRecord> transactionLogs;
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
private int numOfPutsSinceLastCheckpoint;

public InMemoryRecoveryLog() {
this.transactionLogs = new ConcurrentHashMap<>();
this.numOfPutsSinceLastCheckpoint = 0;
}

@Override
public void put(TransactionLogRecord trxRecord) {
transactionLogs.put(trxRecord.getCombinedId(), trxRecord);
ifNeedWriteCheckpoint();
}

/**
* Write a checkpoint to the in-memory log (not needed if you don't need checkpoints).
*/
public void ifNeedWriteCheckpoint() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we make the RecoveryLog an abstract class and have common methods for FileRecoveryLog and InMemoryRecoveryLog defined there.

if (numOfPutsSinceLastCheckpoint >= IN_MEMORY_CHECKPOINT_INTERVAL) {
Map<String, TransactionLogRecord> pendingTransactions = getFailedTransactions();
transactionLogs.clear();
transactionLogs.putAll(pendingTransactions);
}
}

/**
* Retrieve all pending transactions from the in-memory log.
*
* @return Map of pending transactions
*/
public Map<String, TransactionLogRecord> getFailedTransactions() {
Map<String, TransactionLogRecord> failedTransactions = new HashMap<>();

Iterator<Map.Entry<String, TransactionLogRecord>> iterator = transactionLogs.entrySet().iterator();
while (iterator.hasNext()) {
dsplayerX marked this conversation as resolved.
Show resolved Hide resolved
Map.Entry<String, TransactionLogRecord> entry = iterator.next();
String trxId = entry.getKey();
TransactionLogRecord trxRecord = entry.getValue();

if (trxRecord.isCompleted()) {
iterator.remove();
continue;
}
failedTransactions.put(trxId, trxRecord);
iterator.remove();
}

return failedTransactions;
}

public Map<String, TransactionLogRecord> getAllLogs() {
return transactionLogs;
}

public void clearAllLogs() {
transactionLogs.clear();
}

@Override
public void close() {
}
}
Loading
Loading