Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async backups #194

Merged
merged 3 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/android/src/main/java/com/reactnativeldk/LdkModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ class LdkModule(reactContext: ReactApplicationContext) : ReactContextBaseJavaMod
private val channelManagerPersister: LdkChannelManagerPersister by lazy { LdkChannelManagerPersister() }

//Config required to setup below objects
private var chainMonitor: ChainMonitor? = null
private var keysManager: KeysManager? = null
private var channelManager: ChannelManager? = null
private var userConfig: UserConfig? = null
Expand All @@ -173,6 +172,8 @@ class LdkModule(reactContext: ReactApplicationContext) : ReactContextBaseJavaMod
companion object {
lateinit var accountStoragePath: String
lateinit var channelStoragePath: String

var chainMonitor: ChainMonitor? = null
}

init {
Expand Down Expand Up @@ -1023,7 +1024,7 @@ class LdkModule(reactContext: ReactApplicationContext) : ReactContextBaseJavaMod
}

if (remotePersist) {
BackupClient.persist(BackupClient.Label.MISC(fileName), file.readBytes())
BackupClient.addToPersistQueue(BackupClient.Label.MISC(fileName), file.readBytes())
}

handleResolve(promise, LdkCallbackResponses.file_write_success)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.reactnativeldk.classes
import com.reactnativeldk.EventTypes
import com.reactnativeldk.LdkErrors
import com.reactnativeldk.LdkEventEmitter
import com.reactnativeldk.handleReject
import com.reactnativeldk.hexEncodedString
import com.reactnativeldk.hexa
import org.json.JSONObject
Expand All @@ -13,24 +11,22 @@ import java.net.URL
import java.security.MessageDigest
import java.security.SecureRandom
import java.util.Random
import java.util.UUID
import java.util.concurrent.locks.ReentrantLock
import javax.crypto.Cipher
import javax.crypto.spec.GCMParameterSpec
import javax.crypto.spec.IvParameterSpec
import javax.crypto.spec.SecretKeySpec

class BackupError : Exception() {
companion object {
val requiresSetup = RequiresSetup()
val missingBackup = MissingBackup()
val invalidServerResponse = InvalidServerResponse(0)
val decryptFailed = DecryptFailed("")
val signingError = SigningError()
val serverChallengeResponseFailed = ServerChallengeResponseFailed()
val checkError = BackupCheckError()
}
}

class InvalidNetwork() : Exception("Invalid network passed to BackupClient setup")
class RequiresSetup() : Exception("BackupClient requires setup")
class MissingBackup() : Exception("Retrieve failed. Missing backup.")
class InvalidServerResponse(code: Int) : Exception("Invalid backup server response ($code)")
Expand All @@ -44,6 +40,15 @@ class CompleteBackup(
val channelFiles: Map<String, ByteArray>
)

typealias BackupCompleteCallback = () -> Unit

class BackupQueueEntry(
val uuid: UUID,
val label: BackupClient.Label,
val bytes: ByteArray,
val callback: BackupCompleteCallback? = null
)

class BackupClient {
sealed class Label(val string: String, channelId: String = "") {
data class PING(val customName: String = "") : Label("ping")
Expand All @@ -53,6 +58,13 @@ class BackupClient {

data class MISC(val customName: String) :
Label(customName.replace(".json", "").replace(".bin", ""))

val queueId: String
get() = when (this) {
is CHANNEL_MONITOR -> "$string-$channelId"
is MISC -> "$string-$customName"
else -> string
}
}

companion object {
Expand All @@ -72,6 +84,9 @@ class BackupClient {
private var version = "v1"
private var signedMessagePrefix = "react-native-ldk backup server auth:"

private var persistQueues: HashMap<String, ArrayList<BackupQueueEntry>> = HashMap()
private var persistQueuesLock: HashMap<String, Boolean> = HashMap()

var skipRemoteBackup = true //Allow dev to opt out (for development), will not throw error when attempting to persist

private var network: String? = null
Expand Down Expand Up @@ -175,7 +190,7 @@ class BackupClient {
}

@Throws(BackupError::class)
fun persist(label: Label, bytes: ByteArray, retry: Int) {
private fun persist(label: Label, bytes: ByteArray, retry: Int) {
var attempts = 0
var persistError: Exception? = null

Expand Down Expand Up @@ -205,7 +220,7 @@ class BackupClient {
}

@Throws(BackupError::class)
fun persist(label: Label, bytes: ByteArray) {
private fun persist(label: Label, bytes: ByteArray) {
if (skipRemoteBackup) {
return
}
Expand Down Expand Up @@ -491,5 +506,69 @@ class BackupClient {
cachedBearer = CachedBearer(bearer, expires)
return bearer
}

//Backup queue management
fun addToPersistQueue(label: BackupClient.Label, bytes: ByteArray, callback: (() -> Unit)? = null) {
if (BackupClient.skipRemoteBackup) {
callback?.invoke()
LdkEventEmitter.send(
EventTypes.native_log,
"Skipping remote backup queue append for ${label.string}"
)
return
}

persistQueues[label.queueId] = persistQueues[label.queueId] ?: ArrayList()
persistQueues[label.queueId]!!.add(BackupQueueEntry(UUID.randomUUID(), label, bytes, callback))

processPersistNextInQueue(label)
}

private val backupQueueLock = ReentrantLock()
private fun processPersistNextInQueue(label: Label) {
//Check if queue is locked, if not lock it and process next in queue
var backupEntry: BackupQueueEntry? = null
backupQueueLock.lock()
try {
if (persistQueuesLock[label.queueId] == true) {
return
}

persistQueuesLock[label.queueId] = true

backupEntry = persistQueues[label.queueId]?.firstOrNull()
if (backupEntry == null) {
persistQueuesLock[label.queueId] = false
return
}
} finally {
backupQueueLock.unlock()
}

Thread {
try {
persist(backupEntry!!.label, backupEntry.bytes, 10)
backupEntry.callback?.invoke()
} catch (e: Exception) {
LdkEventEmitter.send(
EventTypes.native_log,
"Remote persist failed for ${label.string} with error ${e.message}"
)
} finally {
backupQueueLock.lock()
try {
persistQueues[label.queueId]?.remove(backupEntry)
persistQueuesLock[label.queueId] = false
} finally {
backupQueueLock.unlock()
}

processPersistNextInQueue(label)
}
}.start()
}
}
}



Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,13 @@ class LdkChannelManagerPersister: ChannelManagerConstructor.EventHandler {

override fun persist_manager(channel_manager_bytes: ByteArray?) {
if (channel_manager_bytes != null && LdkModule.accountStoragePath != "") {
BackupClient.persist(BackupClient.Label.CHANNEL_MANAGER(), channel_manager_bytes, retry = 100)
BackupClient.addToPersistQueue(BackupClient.Label.CHANNEL_MANAGER(), channel_manager_bytes) {
LdkEventEmitter.send(EventTypes.native_log, "Remote persisted channel manager to disk")
}

File(LdkModule.accountStoragePath + "/" + LdkFileNames.channel_manager.fileName).writeBytes(channel_manager_bytes)

LdkEventEmitter.send(EventTypes.native_log, "Persisted channel manager to disk")
LdkEventEmitter.send(EventTypes.native_log, "Locally persisted channel manager to disk")
LdkEventEmitter.send(EventTypes.backup, "")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.ldk.structs.Persist.PersistInterface
import java.io.File

class LdkPersister {
fun handleChannel(id: OutPoint, data: ChannelMonitor): ChannelMonitorUpdateStatus {
fun handleChannel(id: OutPoint, data: ChannelMonitor, update_id: MonitorUpdateId): ChannelMonitorUpdateStatus {
val body = Arguments.createMap()
body.putHexString("channel_id", id.to_channel_id())
body.putHexString("counterparty_node_id", data._counterparty_node_id)
Expand All @@ -24,29 +24,36 @@ class LdkPersister {

val isNew = !file.exists()

BackupClient.persist(BackupClient.Label.CHANNEL_MONITOR(channelId=channelId), data.write(), retry = 100)
file.writeBytes(data.write())

LdkEventEmitter.send(EventTypes.native_log, "Persisted channel (${id.to_channel_id().hexEncodedString()}) to disk")
LdkEventEmitter.send(EventTypes.backup, "")
BackupClient.addToPersistQueue(BackupClient.Label.CHANNEL_MONITOR(channelId=channelId), data.write()) {
file.writeBytes(data.write())

//Update chainmonitor with successful persist
val res = LdkModule.chainMonitor?.channel_monitor_updated(id, update_id)
if (res == null || !res.is_ok) {
LdkEventEmitter.send(EventTypes.native_log, "Failed to update chain monitor with persisted channel (${id.to_channel_id().hexEncodedString()})")
} else {
LdkEventEmitter.send(EventTypes.native_log, "Persisted channel (${id.to_channel_id().hexEncodedString()}) to disk")
LdkEventEmitter.send(EventTypes.backup, "")
}
}

if (isNew) {
LdkEventEmitter.send(EventTypes.new_channel, body)
}

return ChannelMonitorUpdateStatus.LDKChannelMonitorUpdateStatus_Completed
return ChannelMonitorUpdateStatus.LDKChannelMonitorUpdateStatus_InProgress
} catch (e: Exception) {
return ChannelMonitorUpdateStatus.LDKChannelMonitorUpdateStatus_UnrecoverableError
}
}

var persister = Persist.new_impl(object : PersistInterface {
override fun persist_new_channel(id: OutPoint, data: ChannelMonitor, update_id: MonitorUpdateId): ChannelMonitorUpdateStatus {
return handleChannel(id, data)
return handleChannel(id, data, update_id)
}

override fun update_persisted_channel(id: OutPoint, update: ChannelMonitorUpdate?, data: ChannelMonitor, update_id: MonitorUpdateId): ChannelMonitorUpdateStatus {
return handleChannel(id, data)
return handleChannel(id, data, update_id)
}
})
}
54 changes: 50 additions & 4 deletions lib/ios/Classes/BackupClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ class BackupClient {

private static let version = "v1"
private static let signedMessagePrefix = "react-native-ldk backup server auth:"


private static let channelManagerBackupQueue = DispatchQueue(label: "ldk.backup.client.channel-manager", qos: .userInitiated)
private static let channelMonitorBackupQueue = DispatchQueue(label: "ldk.backup.client.channel-monitor", qos: .userInitiated)
private static let miscBackupQueue = DispatchQueue(label: "ldk.backup.client.misc", qos: .background)

static var skipRemoteBackup = true //Allow dev to opt out (for development), will not throw error when attempting to persist

private static var network: String?
Expand Down Expand Up @@ -187,7 +191,7 @@ class BackupClient {
}
}

static func persist(_ label: Label, _ bytes: [UInt8], retry: Int) throws {
fileprivate static func persist(_ label: Label, _ bytes: [UInt8], retry: Int) throws {
var attempts: UInt32 = 0

var persistError: Error?
Expand All @@ -208,8 +212,8 @@ class BackupClient {
throw persistError
}
}
static func persist(_ label: Label, _ bytes: [UInt8]) throws {

fileprivate static func persist(_ label: Label, _ bytes: [UInt8]) throws {
struct PersistResponse: Codable {
let success: Bool
let signature: String
Expand Down Expand Up @@ -579,3 +583,45 @@ class BackupClient {
return fetchBearerResponse.bearer
}
}

//Backup queue management
extension BackupClient {
static func addToPersistQueue(_ label: Label, _ bytes: [UInt8], callback: (() -> Void)? = nil) {
guard !skipRemoteBackup else {
callback?()
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Skipping remote backup queue append for \(label.string)")
return
}

var backupQueue: DispatchQueue?

switch label {
case .channelManager:
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Adding channel manager backup to queue")
backupQueue = channelManagerBackupQueue
break
case .channelMonitor(_):
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Adding channel monitor backup to queue")
backupQueue = channelMonitorBackupQueue
break
default:
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Adding \(label.string) to misc backup queue")
backupQueue = miscBackupQueue
break
}

guard let backupQueue else {
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Failed to add \(label.string) to backup queue")
return
}

backupQueue.async {
do {
try persist(label, bytes, retry: 10)
callback?()
} catch {
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Failed to persist channel manager backup. \(error.localizedDescription)")
}
}
}
}
38 changes: 34 additions & 4 deletions lib/ios/Classes/LdkChannelManagerPersister.swift
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class LdkChannelManagerPersister: Persister, ExtendedChannelManagerPersister {

let paymentId = Data(paymentPathFailed.getPaymentId() ?? []).hexEncodedString()
let paymentHash = Data(paymentPathFailed.getPaymentHash()).hexEncodedString()

LdkEventEmitter.shared.send(
withEvent: .channel_manager_payment_path_failed,
body: [
Expand Down Expand Up @@ -256,8 +256,36 @@ class LdkChannelManagerPersister: Persister, ExtendedChannelManagerPersister {

//Save to disk for TX history
persistPaymentClaimed(body)
default:
LdkEventEmitter.shared.send(withEvent: .native_log, body: "ERROR: unknown LdkChannelManagerPersister.handle_event type")
return
case .BumpTransaction:
guard let bumpTransaction = event.getValueAsBumpTransaction() else {
return handleEventError(event)
}

LdkEventEmitter.shared.send(withEvent: .native_log, body: "TODO📣: BumpTransaction")

return
case .ProbeFailed:
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Unused Persister event: ProbeFailed")
return
case .ProbeSuccessful:
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Unused Persister event: ProbeSuccessful")
return
case .InvoiceRequestFailed:
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Unused Persister event: InvoiceRequestFailed")
return
case .HTLCIntercepted:
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Unused Persister event: HTLCIntercepted")
return
case .ChannelPending:
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Unused Persister event: ChannelPending")
return
case .ChannelReady:
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Unused Persister event: ChannelReady")
return
case .HTLCHandlingFailed:
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Unused Persister event: HTLCHandlingFailed")
return
}
}

Expand All @@ -267,7 +295,9 @@ class LdkChannelManagerPersister: Persister, ExtendedChannelManagerPersister {
}

do {
try BackupClient.persist(.channelManager, channelManager.write(), retry: 100)
BackupClient.addToPersistQueue(.channelManager, channelManager.write()) {
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Remote persisted channel manager")
}

try Data(channelManager.write()).write(to: managerStorage)
LdkEventEmitter.shared.send(withEvent: .native_log, body: "Persisted channel manager to disk")
Expand Down
Loading
Loading