From 316ba10ce33a8770ffd296bd6d998f28ee8445a5 Mon Sep 17 00:00:00 2001 From: Marcus Vinicius Girolneto Sousa Date: Fri, 6 Dec 2024 09:52:35 -0300 Subject: [PATCH] feat: moving integrationnet daemon to l0 to check for valid updates --- .../shared_data/daemons/DaemonApis.scala | 9 +++-- .../IntegrationnetNodesOperatorsFetcher.scala | 39 +++++++++++++++++-- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/modules/shared_data/src/main/scala/org/elpaca_metagraph/shared_data/daemons/DaemonApis.scala b/modules/shared_data/src/main/scala/org/elpaca_metagraph/shared_data/daemons/DaemonApis.scala index 640c6bf..2c393d0 100755 --- a/modules/shared_data/src/main/scala/org/elpaca_metagraph/shared_data/daemons/DaemonApis.scala +++ b/modules/shared_data/src/main/scala/org/elpaca_metagraph/shared_data/daemons/DaemonApis.scala @@ -49,8 +49,7 @@ object DaemonApis { logger.info("Spawning L1 daemons") >> spawnExolixDaemon(config, signer) >> - spawnSimplexDaemon(config, signer) >> - spawnIntegrationnetNodesOperatorsDaemon(config, signer) + spawnSimplexDaemon(config, signer) } override def spawnL0Daemons(calculatedStateService: CalculatedStateService[F]): F[Unit] = @@ -62,6 +61,7 @@ object DaemonApis { spawnWalletCreationDaemon(config, signer, calculatedStateService) >> spawnInflowTransactionsDaemon(config, signer, calculatedStateService) >> spawnOutflowTransactionsDaemon(config, signer, calculatedStateService) >> + spawnIntegrationnetNodesOperatorsDaemon(config, signer, calculatedStateService) >> spawnXDaemon(config, signer, calculatedStateService) // spawnYouTubeDaemon(config, signer, calculatedStateService) } @@ -97,9 +97,10 @@ object DaemonApis { private def spawnIntegrationnetNodesOperatorsDaemon( config: ApplicationConfig, - signer: Signer[F] + signer: Signer[F], + calculatedStateService: CalculatedStateService[F] ): F[Unit] = { - val integrationnetNodesOperatorsFetcher = IntegrationnetNodesOperatorsFetcher.make[F](config) + val integrationnetNodesOperatorsFetcher = IntegrationnetNodesOperatorsFetcher.make[F](config, calculatedStateService) val integrationnetNodesOperatorsProcessor = Processor.make[F](integrationnetNodesOperatorsFetcher, signer) logger.info("Spawning IntegrationnetNodeOperators daemon") >> spawn(integrationnetNodesOperatorsProcessor, config.integrationnetNodesOperatorsDaemon.idleTime).start diff --git a/modules/shared_data/src/main/scala/org/elpaca_metagraph/shared_data/daemons/fetcher/IntegrationnetNodesOperatorsFetcher.scala b/modules/shared_data/src/main/scala/org/elpaca_metagraph/shared_data/daemons/fetcher/IntegrationnetNodesOperatorsFetcher.scala index 6df3f1f..ac58e4c 100755 --- a/modules/shared_data/src/main/scala/org/elpaca_metagraph/shared_data/daemons/fetcher/IntegrationnetNodesOperatorsFetcher.scala +++ b/modules/shared_data/src/main/scala/org/elpaca_metagraph/shared_data/daemons/fetcher/IntegrationnetNodesOperatorsFetcher.scala @@ -5,12 +5,15 @@ import cats.syntax.all._ import fs2.io.net.Network import io.circe.generic.auto._ import org.elpaca_metagraph.shared_data.app.ApplicationConfig +import org.elpaca_metagraph.shared_data.calculated_state.CalculatedStateService import org.elpaca_metagraph.shared_data.types.DataUpdates.{ElpacaUpdate, IntegrationnetNodeOperatorUpdate} -import org.elpaca_metagraph.shared_data.types.IntegrationnetOperators.{IntegrationnetOperatorsApiResponse, OperatorInQueue} +import org.elpaca_metagraph.shared_data.types.IntegrationnetOperators.{IntegrationnetNodeOperatorDataSourceAddress, IntegrationnetOperatorsApiResponse, OperatorInQueue} +import org.elpaca_metagraph.shared_data.types.States.{DataSourceType, IntegrationnetNodeOperatorDataSource} import org.http4s._ import org.http4s.circe._ import org.http4s.client.Client import org.tessellation.node.shared.resources.MkHttpClient +import org.tessellation.schema.address.Address import org.typelevel.ci.CIString import org.typelevel.log4cats.SelfAwareStructuredLogger import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -19,10 +22,25 @@ import java.time.LocalDateTime object IntegrationnetNodesOperatorsFetcher { - def make[F[_] : Async : Network](applicationConfig: ApplicationConfig): Fetcher[F] = + def make[F[_] : Async : Network]( + applicationConfig : ApplicationConfig, + calculatedStateService: CalculatedStateService[F] + ): Fetcher[F] = new Fetcher[F] { private val logger: SelfAwareStructuredLogger[F] = Slf4jLogger.getLoggerFromClass(IntegrationnetNodesOperatorsFetcher.getClass) + private def calculateDaysInQueue(existing: IntegrationnetNodeOperatorDataSourceAddress, operatorInQueue: OperatorInQueue): Long = + operatorInQueue.daysInQueue - existing.daysInQueue + + private def getIntegrationnetOperatorsDataSourceAddresses: F[Map[Address, IntegrationnetNodeOperatorDataSourceAddress]] = { + calculatedStateService.get.map { calculatedState => + calculatedState.state.dataSources.get(DataSourceType.IntegrationnetNodeOperator) match { + case Some(integrationnetNodeOperatorDataSource: IntegrationnetNodeOperatorDataSource) => integrationnetNodeOperatorDataSource + case _ => IntegrationnetNodeOperatorDataSource(Map.empty) + } + }.map(_.addresses) + } + def fetchOperatorsInQueue(url: String): F[IntegrationnetOperatorsApiResponse] = { val integrationnetOperatorsConfig = applicationConfig.integrationnetNodesOperatorsDaemon val authorizationHeader = CIString("Authorization") @@ -49,8 +67,21 @@ object IntegrationnetNodesOperatorsFetcher { logger.error(s"Error when fetching from Lattice Integrationnet operators API: ${err.getMessage}") .as(IntegrationnetOperatorsApiResponse(List.empty[OperatorInQueue])) } - _ <- logger.info(s"Found ${integrationnetOperatorsApiResponse.data.length} operators in queue") - dataUpdates = integrationnetOperatorsApiResponse.data.foldLeft(List.empty[ElpacaUpdate]) { (acc, operatorInQueue) => + + integrationnetOperatorsDataSourceAddresses <- getIntegrationnetOperatorsDataSourceAddresses + + _ <- logger.info(s"Fetched ${integrationnetOperatorsApiResponse.data} operators in queue from Lattice API") + integrationnetOperatorsApiResponseFiltered = integrationnetOperatorsApiResponse.data.filter { operatorInQueue => + integrationnetOperatorsDataSourceAddresses.get(operatorInQueue.walletAddress) match { + case None => true + case Some(value) => + val daysInQueue = calculateDaysInQueue(value, operatorInQueue) + daysInQueue > 0 + } + } + _ <- logger.info(s"Filtered ${integrationnetOperatorsApiResponseFiltered.length} valid operators to be rewarded") + + dataUpdates = integrationnetOperatorsApiResponseFiltered.foldLeft(List.empty[ElpacaUpdate]) { (acc, operatorInQueue) => acc :+ IntegrationnetNodeOperatorUpdate(operatorInQueue.walletAddress, operatorInQueue) } } yield dataUpdates