Skip to content

Commit

Permalink
Integrate drpc-logs-oracle (#1096) (#335)
Browse files Browse the repository at this point in the history
* add oracle jar to build (#1096)
* add oracle config (#1096)
* add eth_getLogsEstimate (#1096)
* add scheduler for oracle (#1096)
* fix tests :[ (#1096)
  • Loading branch information
mrdimidium authored Nov 22, 2023
1 parent a7f74bb commit 5a154ac
Show file tree
Hide file tree
Showing 33 changed files with 369 additions and 73 deletions.
10 changes: 9 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,16 @@ dependencies {
implementation(variantOf(libs.netty.tcnative.boringssl) { classifier("linux-x86_64") })
implementation(variantOf(libs.netty.tcnative.boringssl) { classifier("osx-x86_64") })
implementation 'dshackle:foundation:1.0.0'

implementation files('src/main/resources/LogsOracle.jar')
}

// Enable 'Foreign Function & Memory API' (JEP 434)
// Drop after update on Java 21
tasks.withType(JavaCompile) { options.compilerArgs += "--enable-preview" }
tasks.withType(Test) { jvmArgs += "--enable-preview" }
tasks.withType(JavaExec) { jvmArgs += "--enable-preview" }

compileKotlin {
compilerOptions.jvmTarget.set(JvmTarget.JVM_20)
}
Expand Down Expand Up @@ -327,4 +335,4 @@ ktlint {
}
}

compileKotlin.dependsOn chainscodegen
compileKotlin.dependsOn chainscodegen
6 changes: 6 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.emeraldpay.dshackle.config.CacheConfig
import io.emeraldpay.dshackle.config.ChainsConfig
import io.emeraldpay.dshackle.config.CompressionConfig
import io.emeraldpay.dshackle.config.HealthConfig
import io.emeraldpay.dshackle.config.IndexConfig
import io.emeraldpay.dshackle.config.MainConfig
import io.emeraldpay.dshackle.config.MainConfigReader
import io.emeraldpay.dshackle.config.MonitoringConfig
Expand Down Expand Up @@ -130,6 +131,11 @@ open class Config(
return mainConfig.cache ?: CacheConfig()
}

@Bean
open fun indexConfig(@Autowired mainConfig: MainConfig): IndexConfig {
return mainConfig.index ?: IndexConfig()
}

@Bean
open fun signatureConfig(@Autowired mainConfig: MainConfig): SignatureConfig {
return mainConfig.signature ?: SignatureConfig()
Expand Down
21 changes: 21 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/config/IndexConfig.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.emeraldpay.dshackle.config

import io.emeraldpay.dshackle.Chain

class IndexConfig {
var items: HashMap<Chain, Index> = HashMap<Chain, Index>()

class Index(
var rpc: String,
var store: String,
var ram_limit: Long?,
)

fun isChainEnabled(chain: Chain): Boolean {
return items.containsKey(chain)
}

fun getByChain(chain: Chain): Index? {
return items.get(chain)
}
}
62 changes: 62 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/config/IndexConfigReader.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.emeraldpay.dshackle.config

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.foundation.YamlConfigReader
import org.apache.commons.lang3.StringUtils
import org.slf4j.LoggerFactory
import org.springframework.util.unit.DataSize
import org.yaml.snakeyaml.nodes.MappingNode

class IndexConfigReader : YamlConfigReader<IndexConfig>() {

companion object {
private val log = LoggerFactory.getLogger(IndexConfigReader::class.java)
}

override fun read(input: MappingNode?): IndexConfig? {
return getList<MappingNode>(input, "index")?.let { items ->
val config = IndexConfig()

items.value.map {
val blockchainRaw = getValueAsString(it, "chain")
if (blockchainRaw == null || StringUtils.isEmpty(blockchainRaw) || Global.chainById(blockchainRaw) == Chain.UNSPECIFIED) {
throw InvalidConfigYamlException(filename, it.startMark, "Invalid blockchain or not specified")
}

val blockchain = Global.chainById(blockchainRaw)
if (config.items.containsKey(blockchain)) {
throw InvalidConfigYamlException(filename, it.startMark, "Duplicated indexes")
}

val rpc = getValueAsString(it, "rpc")
if (rpc == null || StringUtils.isEmpty(rpc)) {
throw InvalidConfigYamlException(filename, it.startMark, "Invalid rpc specified")
}

val store = getValueAsString(it, "store")
if (store == null || StringUtils.isEmpty(store)) {
throw InvalidConfigYamlException(filename, it.startMark, "Invalid store directory or not specified")
}

val limit = getMapping(it, "limit")
val ram_limit = limit?.let {
val raw = getValueAsString(limit, "ram")
if (raw == null || StringUtils.isEmpty(raw)) {
return null
}

try {
DataSize.parse(raw).toBytes()
} catch (e: IllegalArgumentException) {
throw InvalidConfigYamlException(filename, it.startMark, "Invalid limit for index")
}
}

config.items.put(blockchain, IndexConfig.Index(rpc, store, ram_limit))
}

config
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class MainConfig {
var tls: AuthConfig.ServerTlsAuth? = null
var passthrough: Boolean = false
var cache: CacheConfig? = null
var index: IndexConfig? = null
var proxy: ProxyConfig? = null
var upstreams: UpstreamsConfig? = null
var tokens: TokensConfig? = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class MainConfigReader(
private val optionsReader = ChainOptionsReader()
private val upstreamsConfigReader = UpstreamsConfigReader(fileResolver, optionsReader)
private val cacheConfigReader = CacheConfigReader()
private val indexConfigReader = IndexConfigReader()
private val tokensConfigReader = TokensConfigReader()
private val monitoringConfigReader = MonitoringConfigReader()
private val accessLogReader = AccessLogReader()
Expand Down Expand Up @@ -63,6 +64,9 @@ class MainConfigReader(
cacheConfigReader.read(input)?.let {
config.cache = it
}
indexConfigReader.read(input)?.let {
config.index = it
}
tokensConfigReader.read(input)?.let {
config.tokens = it
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.config.context
import io.emeraldpay.dshackle.BlockchainType.BITCOIN
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.cache.CachesFactory
import io.emeraldpay.dshackle.config.IndexConfig
import io.emeraldpay.dshackle.upstream.CallTargetsHolder
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinMultistream
Expand All @@ -26,14 +27,25 @@ open class MultistreamsConfig(val beanFactory: ConfigurableListableBeanFactory)
headScheduler: Scheduler,
tracer: Tracer,
multistreamEventsScheduler: Scheduler,
indexConfig: IndexConfig,
@Qualifier("logsOracleScheduler")
logsOracleScheduler: Scheduler,
): List<Multistream> {
return Chain.entries
.filterNot { it == Chain.UNSPECIFIED }
.map { chain ->
if (chain.type == BITCOIN) {
bitcoinMultistream(chain, cachesFactory, headScheduler, multistreamEventsScheduler)
} else {
genericMultistream(chain, cachesFactory, headScheduler, tracer, multistreamEventsScheduler)
genericMultistream(
chain,
cachesFactory,
headScheduler,
tracer,
multistreamEventsScheduler,
indexConfig.getByChain(chain),
logsOracleScheduler,
)
}
}
}
Expand All @@ -44,6 +56,8 @@ open class MultistreamsConfig(val beanFactory: ConfigurableListableBeanFactory)
headScheduler: Scheduler,
tracer: Tracer,
multistreamEventsScheduler: Scheduler,
logsOracleConfig: IndexConfig.Index?,
logsOracleScheduler: Scheduler,
): Multistream {
val name = "multi-$chain"
val cs = ChainSpecificRegistry.resolve(chain)
Expand All @@ -58,6 +72,8 @@ open class MultistreamsConfig(val beanFactory: ConfigurableListableBeanFactory)
cs.makeCachingReaderBuilder(tracer),
cs::localReaderBuilder,
cs.subscriptionBuilder(headScheduler),
logsOracleConfig,
logsOracleScheduler,
).also { register(it, name) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ open class SchedulersConfig {
return makeScheduler("head-scheduler", 4, monitoringConfig)
}

@Bean
open fun logsOracleScheduler(monitoringConfig: MonitoringConfig): Scheduler {
return makeScheduler("logs-oracle", 4, monitoringConfig)
}

@Bean
open fun multistreamEventsScheduler(monitoringConfig: MonitoringConfig): Scheduler {
return makeScheduler("events-scheduler", 4, monitoringConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.startup.configure
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.FileResolver
import io.emeraldpay.dshackle.config.ChainsConfig
import io.emeraldpay.dshackle.config.IndexConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.startup.QuorumForLabels
Expand All @@ -24,11 +25,12 @@ import java.util.concurrent.atomic.AtomicInteger
@Component
class BitcoinUpstreamCreator(
chainsConfig: ChainsConfig,
indexConfig: IndexConfig,
callTargets: CallTargetsHolder,
private val genericConnectorFactoryCreator: ConnectorFactoryCreator,
private val fileResolver: FileResolver,
private val headScheduler: Scheduler,
) : UpstreamCreator(chainsConfig, callTargets) {
) : UpstreamCreator(chainsConfig, indexConfig, callTargets) {
private var seq = AtomicInteger(0)

override fun createUpstream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.emeraldpay.dshackle.startup.configure

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.config.ChainsConfig
import io.emeraldpay.dshackle.config.IndexConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.upstream.CallTargetsHolder
Expand All @@ -11,9 +12,10 @@ import org.springframework.stereotype.Component
@Component
class EthereumUpstreamCreator(
chainsConfig: ChainsConfig,
indexConfig: IndexConfig,
callTargets: CallTargetsHolder,
genericConnectorFactoryCreator: ConnectorFactoryCreator,
) : GenericUpstreamCreator(chainsConfig, callTargets, genericConnectorFactoryCreator) {
) : GenericUpstreamCreator(chainsConfig, indexConfig, callTargets, genericConnectorFactoryCreator) {

override fun createUpstream(
upstreamsConfig: UpstreamsConfig.Upstream<*>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.emeraldpay.dshackle.startup.configure

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.config.ChainsConfig
import io.emeraldpay.dshackle.config.IndexConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.startup.QuorumForLabels
Expand All @@ -19,9 +20,10 @@ import kotlin.math.abs
@Component
open class GenericUpstreamCreator(
chainsConfig: ChainsConfig,
indexConfig: IndexConfig,
callTargets: CallTargetsHolder,
private val genericConnectorFactoryCreator: ConnectorFactoryCreator,
) : UpstreamCreator(chainsConfig, callTargets) {
) : UpstreamCreator(chainsConfig, indexConfig, callTargets) {
private val hashes: MutableMap<Byte, Boolean> = HashMap()

override fun createUpstream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.startup.configure
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.config.ChainsConfig
import io.emeraldpay.dshackle.config.IndexConfig
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.upstream.CallTargetsHolder
Expand All @@ -14,6 +15,7 @@ import org.slf4j.LoggerFactory

abstract class UpstreamCreator(
private val chainsConfig: ChainsConfig,
private val indexConfig: IndexConfig,
private val callTargets: CallTargetsHolder,
) {
protected val log: Logger = LoggerFactory.getLogger(this::class.java)
Expand Down Expand Up @@ -45,7 +47,7 @@ abstract class UpstreamCreator(
protected fun buildMethods(config: UpstreamsConfig.Upstream<*>, chain: Chain): CallMethods {
return if (config.methods != null || config.methodGroups != null) {
ManagedCallMethods(
delegate = callTargets.getDefaultMethods(chain),
delegate = callTargets.getDefaultMethods(chain, indexConfig.isChainEnabled(chain)),
enabled = config.methods?.enabled?.map { it.name }?.toSet() ?: emptySet(),
disabled = config.methods?.disabled?.map { it.name }?.toSet() ?: emptySet(),
groupsEnabled = config.methodGroups?.enabled ?: emptySet(),
Expand All @@ -61,7 +63,7 @@ abstract class UpstreamCreator(
}
}
} else {
callTargets.getDefaultMethods(chain)
callTargets.getDefaultMethods(chain, indexConfig.isChainEnabled(chain))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import org.springframework.stereotype.Component
class CallTargetsHolder {
private val callTargets = HashMap<Chain, CallMethods>()

fun getDefaultMethods(chain: Chain): CallMethods {
return callTargets[chain] ?: return setupDefaultMethods(chain)
fun getDefaultMethods(chain: Chain, hasLogsOracle: Boolean): CallMethods {
return callTargets[chain] ?: return setupDefaultMethods(chain, hasLogsOracle)
}

private fun setupDefaultMethods(chain: Chain): CallMethods {
private fun setupDefaultMethods(chain: Chain, hasLogsOracle: Boolean): CallMethods {
val created = when (chain.type) {
BITCOIN -> DefaultBitcoinMethods()
ETHEREUM -> DefaultEthereumMethods(chain)
ETHEREUM -> DefaultEthereumMethods(chain, hasLogsOracle)
STARKNET -> DefaultStarknetMethods(chain)
POLKADOT -> DefaultPolkadotMethods()
SOLANA -> DefaultSolanaMethods()
Expand Down
42 changes: 42 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/LogsOracle.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.dshackle.config.IndexConfig
import org.slf4j.LoggerFactory
import reactor.core.Disposable
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler

class LogsOracle(
private val config: IndexConfig.Index,
private val upstream: Multistream,
private val scheduler: Scheduler,
) {

private val log = LoggerFactory.getLogger(LogsOracle::class.java)

private var subscription: Disposable? = null
private val db = org.drpc.logsoracle.LogsOracle(config.store, config.store, config.ram_limit ?: 0L)

fun start() {
subscription = upstream.getHead().getFlux()
.doOnError { t -> log.warn("Failed to subscribe head for oracle", t) }
.subscribe { println(it.height); db.UpdateHeight(it.height) }
}

fun stop() {
db.close()

subscription?.dispose()
subscription = null
}

fun estimate(
fromBlock: Long?,
toBlock: Long?,
address: List<String>,
topics: List<List<String>>,
): Mono<Long> {
return Mono.fromCallable { db.Query(fromBlock, toBlock, address, topics) }
.subscribeOn(scheduler)
}
}
Loading

0 comments on commit 5a154ac

Please sign in to comment.