Skip to content

Commit

Permalink
feat: moving integrationnet daemon to l0 to check for valid updates
Browse files Browse the repository at this point in the history
  • Loading branch information
IPadawans committed Dec 6, 2024
1 parent 659c44e commit 316ba10
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ object DaemonApis {

logger.info("Spawning L1 daemons") >>
spawnExolixDaemon(config, signer) >>
spawnSimplexDaemon(config, signer) >>
spawnIntegrationnetNodesOperatorsDaemon(config, signer)
spawnSimplexDaemon(config, signer)
}

override def spawnL0Daemons(calculatedStateService: CalculatedStateService[F]): F[Unit] =
Expand All @@ -62,6 +61,7 @@ object DaemonApis {
spawnWalletCreationDaemon(config, signer, calculatedStateService) >>
spawnInflowTransactionsDaemon(config, signer, calculatedStateService) >>
spawnOutflowTransactionsDaemon(config, signer, calculatedStateService) >>
spawnIntegrationnetNodesOperatorsDaemon(config, signer, calculatedStateService) >>
spawnXDaemon(config, signer, calculatedStateService)
// spawnYouTubeDaemon(config, signer, calculatedStateService)
}
Expand Down Expand Up @@ -97,9 +97,10 @@ object DaemonApis {

private def spawnIntegrationnetNodesOperatorsDaemon(
config: ApplicationConfig,
signer: Signer[F]
signer: Signer[F],
calculatedStateService: CalculatedStateService[F]
): F[Unit] = {
val integrationnetNodesOperatorsFetcher = IntegrationnetNodesOperatorsFetcher.make[F](config)
val integrationnetNodesOperatorsFetcher = IntegrationnetNodesOperatorsFetcher.make[F](config, calculatedStateService)
val integrationnetNodesOperatorsProcessor = Processor.make[F](integrationnetNodesOperatorsFetcher, signer)
logger.info("Spawning IntegrationnetNodeOperators daemon") >>
spawn(integrationnetNodesOperatorsProcessor, config.integrationnetNodesOperatorsDaemon.idleTime).start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import cats.syntax.all._
import fs2.io.net.Network
import io.circe.generic.auto._
import org.elpaca_metagraph.shared_data.app.ApplicationConfig
import org.elpaca_metagraph.shared_data.calculated_state.CalculatedStateService
import org.elpaca_metagraph.shared_data.types.DataUpdates.{ElpacaUpdate, IntegrationnetNodeOperatorUpdate}
import org.elpaca_metagraph.shared_data.types.IntegrationnetOperators.{IntegrationnetOperatorsApiResponse, OperatorInQueue}
import org.elpaca_metagraph.shared_data.types.IntegrationnetOperators.{IntegrationnetNodeOperatorDataSourceAddress, IntegrationnetOperatorsApiResponse, OperatorInQueue}
import org.elpaca_metagraph.shared_data.types.States.{DataSourceType, IntegrationnetNodeOperatorDataSource}
import org.http4s._
import org.http4s.circe._
import org.http4s.client.Client
import org.tessellation.node.shared.resources.MkHttpClient
import org.tessellation.schema.address.Address
import org.typelevel.ci.CIString
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
Expand All @@ -19,10 +22,25 @@ import java.time.LocalDateTime

object IntegrationnetNodesOperatorsFetcher {

def make[F[_] : Async : Network](applicationConfig: ApplicationConfig): Fetcher[F] =
def make[F[_] : Async : Network](
applicationConfig : ApplicationConfig,
calculatedStateService: CalculatedStateService[F]
): Fetcher[F] =
new Fetcher[F] {
private val logger: SelfAwareStructuredLogger[F] = Slf4jLogger.getLoggerFromClass(IntegrationnetNodesOperatorsFetcher.getClass)

private def calculateDaysInQueue(existing: IntegrationnetNodeOperatorDataSourceAddress, operatorInQueue: OperatorInQueue): Long =
operatorInQueue.daysInQueue - existing.daysInQueue

private def getIntegrationnetOperatorsDataSourceAddresses: F[Map[Address, IntegrationnetNodeOperatorDataSourceAddress]] = {
calculatedStateService.get.map { calculatedState =>
calculatedState.state.dataSources.get(DataSourceType.IntegrationnetNodeOperator) match {
case Some(integrationnetNodeOperatorDataSource: IntegrationnetNodeOperatorDataSource) => integrationnetNodeOperatorDataSource
case _ => IntegrationnetNodeOperatorDataSource(Map.empty)
}
}.map(_.addresses)
}

def fetchOperatorsInQueue(url: String): F[IntegrationnetOperatorsApiResponse] = {
val integrationnetOperatorsConfig = applicationConfig.integrationnetNodesOperatorsDaemon
val authorizationHeader = CIString("Authorization")
Expand All @@ -49,8 +67,21 @@ object IntegrationnetNodesOperatorsFetcher {
logger.error(s"Error when fetching from Lattice Integrationnet operators API: ${err.getMessage}")
.as(IntegrationnetOperatorsApiResponse(List.empty[OperatorInQueue]))
}
_ <- logger.info(s"Found ${integrationnetOperatorsApiResponse.data.length} operators in queue")
dataUpdates = integrationnetOperatorsApiResponse.data.foldLeft(List.empty[ElpacaUpdate]) { (acc, operatorInQueue) =>

integrationnetOperatorsDataSourceAddresses <- getIntegrationnetOperatorsDataSourceAddresses

_ <- logger.info(s"Fetched ${integrationnetOperatorsApiResponse.data} operators in queue from Lattice API")
integrationnetOperatorsApiResponseFiltered = integrationnetOperatorsApiResponse.data.filter { operatorInQueue =>
integrationnetOperatorsDataSourceAddresses.get(operatorInQueue.walletAddress) match {
case None => true
case Some(value) =>
val daysInQueue = calculateDaysInQueue(value, operatorInQueue)
daysInQueue > 0
}
}
_ <- logger.info(s"Filtered ${integrationnetOperatorsApiResponseFiltered.length} valid operators to be rewarded")

dataUpdates = integrationnetOperatorsApiResponseFiltered.foldLeft(List.empty[ElpacaUpdate]) { (acc, operatorInQueue) =>
acc :+ IntegrationnetNodeOperatorUpdate(operatorInQueue.walletAddress, operatorInQueue)
}
} yield dataUpdates
Expand Down

0 comments on commit 316ba10

Please sign in to comment.