-
Notifications
You must be signed in to change notification settings - Fork 758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Transaction Recovery for XA Resources #42080
Changes from 41 commits
3106e32
630b4c5
9a56cdf
1cb2ff8
d295306
a540ae5
f8a2b3f
4b6dfdb
bb0df43
228fb0b
8f8418d
c11d3e4
a51ee30
38b0442
36f1c79
6f78138
c49e433
774c527
af0dbe4
86a1e35
0d2fc9f
dab23b5
86ae597
30478f8
d8bcba2
152d2ab
83fe1e2
24b1c3f
81a8c92
23b10e6
330cbb2
c797299
f9269fa
db98c84
bf7d6e2
ea89fbb
0524380
8bcafe5
4fd4179
5c63a68
d629dbd
6d2287d
a33fb3c
9758540
b814905
144b3d0
87a031b
c2af735
3019503
07084e3
0d798fd
73be34d
c07cdaa
eb0775a
9807001
f303138
6c02229
5ae4915
ad065ed
4b73d66
ab61d60
0d38a71
be64fab
878cc73
db1d550
d6a6919
8232e93
927c6b5
11ac5e8
45e30eb
393d187
5c63125
d202e3b
224eea5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,253 @@ | ||
/* | ||
* Copyright (c) 2024, WSO2 Inc. (http://www.wso2.org). | ||
* | ||
* WSO2 Inc. licenses this file to you under the Apache License, | ||
* Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.ballerina.runtime.transactions; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.BufferedReader; | ||
import java.io.File; | ||
import java.io.FileReader; | ||
import java.io.IOException; | ||
import java.io.PrintStream; | ||
import java.nio.channels.FileChannel; | ||
import java.nio.channels.FileLock; | ||
import java.nio.file.Path; | ||
import java.nio.file.Files; | ||
import java.nio.file.StandardOpenOption; | ||
import java.util.HashMap; | ||
import java.util.Iterator; | ||
import java.util.Map; | ||
|
||
public class FileRecoveryLog implements RecoveryLog { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(FileRecoveryLog.class); | ||
private String baseFileName; | ||
private Path recoveryLogDir; | ||
private int checkpointInterval; | ||
private final boolean deleteOldLogs; | ||
private int numOfPutsSinceLastCheckpoint; | ||
private File file; | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private FileChannel appendChannel = null; | ||
private Map<String, TransactionLogRecord> existingLogs; | ||
private static final PrintStream stderr = System.err; | ||
|
||
/** | ||
* Initializes a new FileRecoveryLog instance with the given base file name. | ||
* | ||
* @param baseFileName The base name for the recovery log files. | ||
* @param checkpointInterval The interval at which to write checkpoints to the log file. | ||
* @param recoveryLogDir The directory to store the recovery log files in. | ||
* @param deleteOldLogs Whether to delete old log files when creating a new one. | ||
*/ | ||
public FileRecoveryLog(String baseFileName, int checkpointInterval, Path recoveryLogDir, boolean deleteOldLogs) { | ||
this.baseFileName = baseFileName; | ||
this.recoveryLogDir = recoveryLogDir; | ||
this.deleteOldLogs = deleteOldLogs; | ||
this.checkpointInterval = checkpointInterval; | ||
this.existingLogs = new HashMap<>(); | ||
this.file = createNextVersion(); | ||
this.numOfPutsSinceLastCheckpoint = 0; | ||
} | ||
|
||
/** | ||
* Creates the next version of the recovery log file, cleaning up the previous one. | ||
* | ||
* @return The newly created log file. | ||
*/ | ||
private File createNextVersion() { | ||
int latestVersion = findLatestVersion(); | ||
File oldFile = recoveryLogDir.resolve(baseFileName + latestVersion + ".log").toFile(); | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (oldFile.exists()) { | ||
existingLogs = readLogsFromFile(oldFile); | ||
if (deleteOldLogs) { | ||
File[] files = recoveryLogDir.toFile().listFiles( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We iterate through the directory multiple times right. In the |
||
(dir, name) -> name.matches(baseFileName + "(\\d+)\\.log") | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
); | ||
for (File file : files) { | ||
file.delete(); | ||
} | ||
} | ||
} | ||
File newFile = recoveryLogDir.resolve(baseFileName + (latestVersion + 1) + ".log").toFile(); | ||
try { | ||
Files.createDirectories(recoveryLogDir); // create directory if not exists | ||
newFile.createNewFile(); | ||
initAppendChannel(newFile); | ||
if (existingLogs == null) { | ||
return newFile; | ||
} | ||
// write existing unfinished logs to the new file | ||
if (!existingLogs.isEmpty()) { | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
cleanUpFinishedLogs(); | ||
if (!existingLogs.isEmpty()) { | ||
for (Map.Entry<String, TransactionLogRecord> entry : existingLogs.entrySet()) { | ||
put(entry.getValue()); | ||
} | ||
existingLogs.clear(); | ||
} | ||
} | ||
} catch (IOException e) { | ||
stderr.println("error: failed to create recovery log file in " + recoveryLogDir); | ||
} | ||
return newFile; | ||
} | ||
|
||
/** | ||
* Finds the latest version of the recovery log file. | ||
* | ||
* @return The latest version of the recovery log file. | ||
*/ | ||
private int findLatestVersion() { | ||
int latestVersion = 0; | ||
File directory = recoveryLogDir.toFile(); | ||
File[] files = directory.listFiles((dir, name) -> name.matches(baseFileName + "(\\d+)\\.log")); | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need a check to see if the directory exists? |
||
if (files == null) { | ||
return latestVersion; | ||
} | ||
for (File file : files) { | ||
String fileName = file.getName(); | ||
int version = Integer.parseInt(fileName.replaceAll( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets make sure that even a file name that doesn't adhere to the expected format will not cause any unexpected error here. |
||
baseFileName, "").replaceAll(".log", "") | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
); | ||
if (version > latestVersion) { | ||
latestVersion = version; | ||
} | ||
} | ||
return latestVersion; | ||
} | ||
|
||
/** | ||
* Initializes the append channel for the given file. | ||
* | ||
* @param file The file to initialize the append channel for. | ||
*/ | ||
private void initAppendChannel(File file) { | ||
try { | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
appendChannel = FileChannel.open(file.toPath(), StandardOpenOption.APPEND); | ||
FileLock lock = appendChannel.tryLock(); | ||
if (lock == null) { | ||
stderr.println("error: failed to acquire lock on recovery log file " + file.toPath()); | ||
} | ||
} catch (IOException e) { | ||
stderr.println("error: failed to acquire lock on recovery log file " + file.toPath()); | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
@Override | ||
public void put(TransactionLogRecord trxRecord) { | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
boolean force = !(trxRecord.getTransactionState().equals(RecoveryState.TERMINATED)); // lazy write | ||
writeToFile(trxRecord.getTransactionLogRecord(), force); | ||
if (checkpointInterval != -1) { | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ifNeedWriteCheckpoint(); | ||
numOfPutsSinceLastCheckpoint++; | ||
} | ||
} | ||
|
||
public Map<String, TransactionLogRecord> getPendingLogs() { | ||
Map<String, TransactionLogRecord> pendingTransactions = new HashMap<>(); | ||
Map<String, TransactionLogRecord> transactionLogs = readLogsFromFile(file); | ||
if (transactionLogs == null) { | ||
return null; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO it's better to use |
||
for (Map.Entry<String, TransactionLogRecord> entry : transactionLogs.entrySet()) { | ||
String trxId = entry.getKey(); | ||
TransactionLogRecord trxRecord = entry.getValue(); | ||
if (!trxRecord.isCompleted()) { | ||
pendingTransactions.put(trxId, trxRecord); | ||
} | ||
} | ||
return pendingTransactions; | ||
} | ||
|
||
/** | ||
* Write a transaction log entry to the recovery log file. | ||
* | ||
* @param str the log entry to write | ||
*/ | ||
private void writeToFile(String str, boolean force) { | ||
if (appendChannel == null || !appendChannel.isOpen()) { | ||
initAppendChannel(file); | ||
} | ||
byte[] bytes = str.getBytes(); | ||
try { | ||
appendChannel.write(java.nio.ByteBuffer.wrap(bytes)); | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
appendChannel.force(force); | ||
} catch (IOException e) { | ||
stderr.println("error: failed to write to recovery log file " + file.toPath()); | ||
} | ||
} | ||
|
||
private Map<String, TransactionLogRecord> readLogsFromFile(File file) { | ||
Map<String, TransactionLogRecord> logMap = new HashMap<>(); | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (!file.exists() || file.length() == 0) { | ||
return null; | ||
} | ||
if (appendChannel != null) { | ||
closeEverything(); | ||
} | ||
try (BufferedReader reader = new BufferedReader(new FileReader(file))) { | ||
String line; | ||
while ((line = reader.readLine()) != null) { | ||
TransactionLogRecord transactionLogRecord = TransactionLogRecord.parseTransactionLogRecord(line); | ||
if (transactionLogRecord != null) { | ||
// TODO: add a check here to check whether the record exists in the logMap and new state is a valid | ||
// state from the current state | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove the TODO if this is not valid |
||
logMap.put(transactionLogRecord.transactionId, transactionLogRecord); | ||
} | ||
} | ||
} catch (IOException e) { | ||
stderr.println("error: failed to read the recovery log file " + file.toPath()); | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return logMap; | ||
} | ||
|
||
private void cleanUpFinishedLogs() { | ||
if (existingLogs.isEmpty()) { | ||
return; | ||
} | ||
Iterator<Map.Entry<String, TransactionLogRecord>> iterator = existingLogs.entrySet().iterator(); | ||
while (iterator.hasNext()) { | ||
Map.Entry<String, TransactionLogRecord> entry = iterator.next(); | ||
if (entry.getValue().isCompleted()) { | ||
iterator.remove(); // Safely remove the entry | ||
} | ||
} | ||
} | ||
|
||
public void ifNeedWriteCheckpoint() { | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (numOfPutsSinceLastCheckpoint >= checkpointInterval) { | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
numOfPutsSinceLastCheckpoint = 0; // need to set here otherwise it will just keep creating new files | ||
File newFile = createNextVersion(); | ||
file = newFile; | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
} | ||
|
||
private void closeEverything() { | ||
try { | ||
appendChannel.close(); | ||
} catch (IOException e) { | ||
// nothing to do. java cray cray. :) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
/* | ||
* Copyright (c) 2024, WSO2 Inc. (http://www.wso2.org). | ||
* | ||
* WSO2 Inc. licenses this file to you under the Apache License, | ||
* Version 2.0 (the "License"); you may not use this file except | ||
* in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.ballerina.runtime.transactions; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.HashMap; | ||
import java.util.Iterator; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
import static io.ballerina.runtime.transactions.TransactionConstants.IN_MEMORY_CHECKPOINT_INTERVAL; | ||
|
||
public class InMemoryRecoveryLog implements RecoveryLog { | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private static final Logger log = LoggerFactory.getLogger(InMemoryRecoveryLog.class); | ||
private Map<String, TransactionLogRecord> transactionLogs; | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private int numOfPutsSinceLastCheckpoint; | ||
|
||
public InMemoryRecoveryLog() { | ||
this.transactionLogs = new ConcurrentHashMap<>(); | ||
this.numOfPutsSinceLastCheckpoint = 0; | ||
} | ||
|
||
@Override | ||
public void put(TransactionLogRecord trxRecord) { | ||
transactionLogs.put(trxRecord.getCombinedId(), trxRecord); | ||
ifNeedWriteCheckpoint(); | ||
} | ||
|
||
/** | ||
* Write a checkpoint to the in-memory log (not needed if you don't need checkpoints). | ||
*/ | ||
public void ifNeedWriteCheckpoint() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't we make the |
||
if (numOfPutsSinceLastCheckpoint >= IN_MEMORY_CHECKPOINT_INTERVAL) { | ||
Map<String, TransactionLogRecord> pendingTransactions = getFailedTransactions(); | ||
transactionLogs.clear(); | ||
transactionLogs.putAll(pendingTransactions); | ||
} | ||
} | ||
|
||
/** | ||
* Retrieve all pending transactions from the in-memory log. | ||
* | ||
* @return Map of pending transactions | ||
*/ | ||
public Map<String, TransactionLogRecord> getFailedTransactions() { | ||
Map<String, TransactionLogRecord> failedTransactions = new HashMap<>(); | ||
|
||
Iterator<Map.Entry<String, TransactionLogRecord>> iterator = transactionLogs.entrySet().iterator(); | ||
while (iterator.hasNext()) { | ||
dsplayerX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Map.Entry<String, TransactionLogRecord> entry = iterator.next(); | ||
String trxId = entry.getKey(); | ||
TransactionLogRecord trxRecord = entry.getValue(); | ||
|
||
if (trxRecord.isCompleted()) { | ||
iterator.remove(); | ||
continue; | ||
} | ||
failedTransactions.put(trxId, trxRecord); | ||
iterator.remove(); | ||
} | ||
|
||
return failedTransactions; | ||
} | ||
|
||
public Map<String, TransactionLogRecord> getAllLogs() { | ||
return transactionLogs; | ||
} | ||
|
||
public void clearAllLogs() { | ||
transactionLogs.clear(); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commonly followed convention for naming boolean variables is to use a prefix like 'has', 'is', 'can' etc. In this case you can use something similar to
isOldLogsDeleted
.Check other places as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleteOldLogs
in the sense whether the user has configured to delete the old logs or not. Is that okay?