diff --git a/emerald-grpc b/emerald-grpc index 52cd5c929..f827eae17 160000 --- a/emerald-grpc +++ b/emerald-grpc @@ -1 +1 @@ -Subproject commit 52cd5c929de2db3c684bfa0354b25d3c55d5d00b +Subproject commit f827eae17acdb8077091789a6b9d7f2569d7ce8c diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/Describe.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/Describe.kt index 35075554b..354c05ff6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/Describe.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/Describe.kt @@ -46,6 +46,8 @@ class Describe( .addAllSupportedSubscriptions(chainUpstreams.getEgressSubscription().getAvailableTopics()) .setStatus(status) .setCurrentHeight(chainUpstreams.getHead().getCurrentHeight() ?: 0) + .setCurrentLowerBlock(chainUpstreams.getLowerBlock().blockNumber) + .setCurrentLowerSlot(chainUpstreams.getLowerBlock().slot ?: 0) chainUpstreams.getQuorumLabels() .forEach { node -> val nodeDetails = BlockchainOuterClass.NodeDetails.newBuilder() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/StreamHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/StreamHead.kt index 7e7336f6f..d13787e0d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/StreamHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/StreamHead.kt @@ -21,6 +21,7 @@ import io.emeraldpay.api.proto.BlockchainOuterClass import io.emeraldpay.api.proto.Common import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.MultistreamHolder import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired @@ -39,20 +40,23 @@ class StreamHead( return requestMono.map { request -> Chain.byId(request.type.number) }.flatMapMany { chain -> - multistreamHolder.getUpstream(chain).getHead() + val ms = multistreamHolder.getUpstream(chain) + ms.getHead() .getFlux() - .map { asProto(chain, it!!) } + .map { asProto(ms, chain, it!!) } .onErrorContinue { t, _ -> log.warn("Head subscription error", t) } } } - fun asProto(chain: Chain, block: BlockContainer): BlockchainOuterClass.ChainHead { + fun asProto(ms: Multistream, chain: Chain, block: BlockContainer): BlockchainOuterClass.ChainHead { return BlockchainOuterClass.ChainHead.newBuilder() .setChainValue(chain.id) .setHeight(block.height) .setSlot(block.slot) + .setCurrentLowerBlock(ms.getLowerBlock().blockNumber) + .setCurrentLowerSlot(ms.getLowerBlock().slot ?: 0) .setTimestamp(block.timestamp.toEpochMilli()) .setWeight(ByteString.copyFrom(block.difficulty.toByteArray())) .setBlockId(block.hash.toHex()) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt index eff68248e..7ea522f4f 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt @@ -87,6 +87,7 @@ open class GenericUpstreamCreator( connectorFactory, cs::validator, cs::labelDetector, + cs::lowerBoundBlockDetector, ) upstream.start() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/LowerBoundBlockDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/LowerBoundBlockDetector.kt new file mode 100644 index 000000000..1a0ccc32a --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/LowerBoundBlockDetector.kt @@ -0,0 +1,68 @@ +package io.emeraldpay.dshackle.upstream + +import io.emeraldpay.dshackle.Chain +import org.slf4j.LoggerFactory +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.time.Duration +import java.util.concurrent.atomic.AtomicReference + +typealias LowerBoundBlockDetectorBuilder = (Chain, Upstream) -> LowerBoundBlockDetector + +fun Long.toHex() = "0x${this.toString(16)}" + +abstract class LowerBoundBlockDetector( + private val chain: Chain, + private val upstream: Upstream, +) { + private val currentLowerBlock = AtomicReference(LowerBlockData.default()) + + private val log = LoggerFactory.getLogger(this::class.java) + + fun lowerBlock(): Flux { + return Flux.interval( + Duration.ofSeconds(15), + Duration.ofSeconds(60), + ) + .flatMap { lowerBlockDetect() } + .filter { it.blockNumber > currentLowerBlock.get().blockNumber } + .map { + log.info("Lower block of ${upstream.getId()} $chain: block height - {}, slot - {}", it.blockNumber, it.slot ?: "NA") + + currentLowerBlock.set(it) + it + } + } + + fun getCurrentLowerBlock(): LowerBlockData = currentLowerBlock.get() + + protected abstract fun lowerBlockDetect(): Mono + + data class LowerBlockData( + val blockNumber: Long, + val slot: Long?, + ) : Comparable { + constructor(blockNumber: Long) : this(blockNumber, null) + + companion object { + fun default() = LowerBlockData(0, 0) + } + + override fun compareTo(other: LowerBlockData): Int { + return this.blockNumber.compareTo(other.blockNumber) + } + } + + data class LowerBoundData( + val left: Long, + val right: Long, + val current: Long, + val found: Boolean, + ) { + constructor(left: Long, right: Long) : this(left, right, 0, false) + + constructor(left: Long, right: Long, current: Long) : this(left, right, current, false) + + constructor(current: Long, found: Boolean) : this(0, 0, current, found) + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index 156a2770f..90a0faaaa 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -78,6 +78,9 @@ abstract class Multistream( @Volatile private var capabilities: Set = emptySet() + @Volatile + private var lowerBlock: LowerBoundBlockDetector.LowerBlockData = LowerBoundBlockDetector.LowerBlockData.default() + @Volatile private var quorumLabels: List? = null private val meters: MutableMap> = HashMap() @@ -227,6 +230,10 @@ abstract class Multistream( } } quorumLabels = getQuorumLabels(availableUpstreams) + availableUpstreams + .filter { it.getLowerBlock() != LowerBoundBlockDetector.LowerBlockData.default() } + .minOfOrNull { it.getLowerBlock() } + ?.let { lowerBlock = it } when { upstreams.size == 1 -> { lagObserver?.stop() @@ -313,11 +320,11 @@ abstract class Multistream( started = true } + override fun getLowerBlock(): LowerBoundBlockDetector.LowerBlockData = lowerBlock + private fun observeUpstreamsStatuses() { subscribeAddedUpstreams() - .distinctUntilChanged { - it.getId() - }.flatMap { upstream -> + .flatMap { upstream -> val statusStream = upstream.observeStatus() .map { UpstreamChangeEvent(this.chain, upstream, UpstreamChangeEvent.ChangeType.UPDATED) } val stateStream = upstream.observeState() @@ -407,9 +414,10 @@ abstract class Multistream( val weak = getUpstreams() .filter { it.getStatus() != UpstreamAvailability.OK } .joinToString(", ") { it.getId() } + val lowerBlockData = "[height=${lowerBlock.blockNumber}, slot=${lowerBlock.slot ?: "NA"}]" val instance = System.identityHashCode(this).toString(16) - log.info("State of ${chain.chainCode}: height=${height ?: '?'}, status=[$statuses], lag=[$lag], weak=[$weak] ($instance)") + log.info("State of ${chain.chainCode}: height=${height ?: '?'}, status=[$statuses], lag=[$lag], lower block=$lowerBlockData, weak=[$weak] ($instance)") } fun test(event: UpstreamChangeEvent): Boolean { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundBlockDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundBlockDetector.kt new file mode 100644 index 000000000..de2ec575a --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundBlockDetector.kt @@ -0,0 +1,53 @@ +package io.emeraldpay.dshackle.upstream + +import io.emeraldpay.dshackle.Chain +import reactor.core.publisher.Mono + +abstract class RecursiveLowerBoundBlockDetector( + chain: Chain, + private val upstream: Upstream, +) : LowerBoundBlockDetector(chain, upstream) { + + override fun lowerBlockDetect(): Mono { + return Mono.just(upstream.getHead()) + .flatMap { + val currentHeight = it.getCurrentHeight() + if (currentHeight == null) { + Mono.empty() + } else { + Mono.just(LowerBoundData(0, currentHeight)) + } + } + .expand { data -> + if (data.found) { + Mono.empty() + } else { + val middle = middleBlock(data) + + if (data.left > data.right) { + val current = if (data.current == 0L) 1 else data.current + Mono.just(LowerBoundData(current, true)) + } else { + hasState(middle) + .map { + if (it) { + LowerBoundData(data.left, middle - 1, middle) + } else { + LowerBoundData(middle + 1, data.right, data.current) + } + } + } + } + } + .filter { it.found } + .next() + .map { + LowerBlockData(it.current) + } + } + + private fun middleBlock(lowerBoundData: LowerBoundData): Long = + lowerBoundData.left + (lowerBoundData.right - lowerBoundData.left) / 2 + + protected abstract fun hasState(blockNumber: Long): Mono +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt index 0a7d1fb21..50fabeb25 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt @@ -44,6 +44,7 @@ interface Upstream : Lifecycle { fun getId(): String fun getCapabilities(): Set fun isGrpc(): Boolean + fun getLowerBlock(): LowerBoundBlockDetector.LowerBlockData fun cast(selfType: Class): T diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt index fa0b4922b..409ecb1c2 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt @@ -25,6 +25,7 @@ import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.Lifecycle +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.calls.CallMethods @@ -71,6 +72,10 @@ open class BitcoinRpcUpstream( return false } + override fun getLowerBlock(): LowerBoundBlockDetector.LowerBlockData { + return LowerBoundBlockDetector.LowerBlockData.default() + } + @Suppress("UNCHECKED_CAST") override fun cast(selfType: Class): T { if (!selfType.isAssignableFrom(this.javaClass)) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt index 053f7a8e7..afcc1e3ed 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt @@ -12,6 +12,7 @@ import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LabelsDetector import io.emeraldpay.dshackle.upstream.LogsOracle +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamValidator @@ -88,6 +89,10 @@ object EthereumChainSpecific : AbstractPollChainSpecific() { return EthereumUpstreamValidator(chain, upstream, options, config) } + override fun lowerBoundBlockDetector(chain: Chain, upstream: Upstream): LowerBoundBlockDetector { + return EthereumLowerBoundBlockDetector(chain, upstream) + } + override fun labelDetector(chain: Chain, reader: JsonRpcReader): LabelsDetector { return EthereumLabelsDetector(reader, chain) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt new file mode 100644 index 000000000..74e9de6a3 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt @@ -0,0 +1,27 @@ +package io.emeraldpay.dshackle.upstream.ethereum + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.upstream.RecursiveLowerBoundBlockDetector +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse +import io.emeraldpay.dshackle.upstream.toHex +import reactor.core.publisher.Mono + +class EthereumLowerBoundBlockDetector( + chain: Chain, + private val upstream: Upstream, +) : RecursiveLowerBoundBlockDetector(chain, upstream) { + + override fun hasState(blockNumber: Long): Mono { + return upstream.getIngressReader().read( + JsonRpcRequest( + "eth_getBalance", + listOf("0x756F45E3FA69347A9A973A725E3C98bC4db0b5a0", blockNumber.toHex()), + ), + ) + .flatMap(JsonRpcResponse::requireResult) + .map { true } + .onErrorReturn(false) + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt index 8a0c56b1e..c3dd83ac6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt @@ -18,6 +18,7 @@ import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LabelsDetector import io.emeraldpay.dshackle.upstream.LogsOracle +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamValidator @@ -60,6 +61,8 @@ interface ChainSpecific { fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription fun callSelector(caches: Caches): CallSelector? + + fun lowerBoundBlockDetector(chain: Chain, upstream: Upstream): LowerBoundBlockDetector } object ChainSpecificRegistry { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 71698a098..56f87c867 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -13,6 +13,8 @@ import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LabelsDetectorBuilder +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetectorBuilder import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.UpstreamValidator @@ -40,6 +42,7 @@ open class GenericUpstream( connectorFactory: ConnectorFactory, validatorBuilder: UpstreamValidatorBuilder, labelsDetectorBuilder: LabelsDetectorBuilder, + lowerBoundBlockDetectorBuilder: LowerBoundBlockDetectorBuilder, ) : DefaultUpstream(id, hash, null, UpstreamAvailability.OK, options, role, targets, node, chainConfig), Lifecycle { private val validator: UpstreamValidator? = validatorBuilder(chain, this, getOptions(), chainConfig) @@ -51,6 +54,8 @@ open class GenericUpstream( private var livenessSubscription: Disposable? = null private val labelsDetector = labelsDetectorBuilder(chain, this.getIngressReader()) + private val lowerBoundBlockDetector = lowerBoundBlockDetectorBuilder(chain, this) + override fun getHead(): Head { return connector.getHead() } @@ -77,6 +82,10 @@ open class GenericUpstream( return false } + override fun getLowerBlock(): LowerBoundBlockDetector.LowerBlockData { + return lowerBoundBlockDetector.getCurrentLowerBlock() + } + @Suppress("UNCHECKED_CAST") override fun cast(selfType: Class): T { if (!selfType.isAssignableFrom(this.javaClass)) { @@ -162,6 +171,8 @@ open class GenericUpstream( log.debug("Error while checking live subscription for ${getId()}", it) },) detectLabels() + + detectLowerBlock() } override fun stop() { @@ -185,6 +196,15 @@ open class GenericUpstream( } } + private fun detectLowerBlock() { + lowerBoundBlockDetector.lowerBlock() + .subscribe { + stateEventStream.emitNext( + UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.UPDATED), + ) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } + } + } + fun getIngressSubscription(): IngressSubscription { return connector.getIngressSubscription() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt index 471c65766..887c64374 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt @@ -29,6 +29,7 @@ import io.emeraldpay.dshackle.upstream.BuildInfo import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.Lifecycle +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinUpstream @@ -148,6 +149,10 @@ class BitcoinGrpcUpstream( return true } + override fun getLowerBlock(): LowerBoundBlockDetector.LowerBlockData { + return LowerBoundBlockDetector.LowerBlockData.default() + } + @Suppress("UNCHECKED_CAST") override fun cast(selfType: Class): T { if (!selfType.isAssignableFrom(this.javaClass)) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt index 94dc08fb9..c2eed2e40 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt @@ -31,6 +31,7 @@ import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.Lifecycle +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.forkchoice.NoChoiceWithPriorityForkChoice @@ -177,4 +178,8 @@ open class GenericGrpcUpstream( override fun isGrpc(): Boolean { return true } + + override fun getLowerBlock(): LowerBoundBlockDetector.LowerBlockData { + return LowerBoundBlockDetector.LowerBlockData.default() + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt index 3c236de1d..4edf880ce 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt @@ -14,6 +14,7 @@ import io.emeraldpay.dshackle.upstream.EgressSubscription import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LogsOracle +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.SingleCallValidator import io.emeraldpay.dshackle.upstream.Upstream @@ -103,6 +104,10 @@ object PolkadotChainSpecific : AbstractPollChainSpecific() { ) } + override fun lowerBoundBlockDetector(chain: Chain, upstream: Upstream): LowerBoundBlockDetector { + return PolkadotLowerBoundBlockDetector(chain, upstream) + } + fun validate(data: ByteArray, peers: Int, upstreamId: String): UpstreamAvailability { val resp = Global.objectMapper.readValue(data, PolkadotHealth::class.java) if (resp.isSyncing) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundBlockDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundBlockDetector.kt new file mode 100644 index 000000000..e4d8edb4b --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundBlockDetector.kt @@ -0,0 +1,39 @@ +package io.emeraldpay.dshackle.upstream.polkadot + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.upstream.RecursiveLowerBoundBlockDetector +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse +import io.emeraldpay.dshackle.upstream.toHex +import reactor.core.publisher.Mono + +class PolkadotLowerBoundBlockDetector( + chain: Chain, + private val upstream: Upstream, +) : RecursiveLowerBoundBlockDetector(chain, upstream) { + + override fun hasState(blockNumber: Long): Mono { + return upstream.getIngressReader().read( + JsonRpcRequest( + "chain_getBlockHash", + listOf(blockNumber.toHex()), // in polkadot state methods work only with hash + ), + ) + .flatMap(JsonRpcResponse::requireResult) + .map { + String(it, 1, it.size - 2) + } + .flatMap { + upstream.getIngressReader().read( + JsonRpcRequest( + "state_getMetadata", + listOf(it), + ), + ) + } + .flatMap(JsonRpcResponse::requireResult) + .map { true } + .onErrorReturn(false) + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt index 840c49424..73105f9bb 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt @@ -13,6 +13,7 @@ import io.emeraldpay.dshackle.upstream.DefaultSolanaMethods import io.emeraldpay.dshackle.upstream.EgressSubscription import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LabelsDetector +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.SingleCallValidator import io.emeraldpay.dshackle.upstream.Upstream @@ -136,6 +137,10 @@ object SolanaChainSpecific : AbstractChainSpecific() { ) } + override fun lowerBoundBlockDetector(chain: Chain, upstream: Upstream): LowerBoundBlockDetector { + return SolanaLowerBoundBlockDetector(chain, upstream) + } + override fun labelDetector(chain: Chain, reader: JsonRpcReader): LabelsDetector? { return null } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundBlockDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundBlockDetector.kt new file mode 100644 index 000000000..cfee29098 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundBlockDetector.kt @@ -0,0 +1,72 @@ +package io.emeraldpay.dshackle.upstream.solana + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse +import reactor.core.publisher.Mono + +class SolanaLowerBoundBlockDetector( + chain: Chain, + upstream: Upstream, +) : LowerBoundBlockDetector(chain, upstream) { + private val reader = upstream.getIngressReader() + + override fun lowerBlockDetect(): Mono { + return Mono.just(reader) + .flatMap { + it.read( + JsonRpcRequest("getFirstAvailableBlock", listOf()), // in case of solana we talk about the slot of the lowest confirmed block + ) + } + .flatMap(JsonRpcResponse::requireResult) + .map { + String(it).toLong() + } + .flatMap { slot -> + reader.read( + JsonRpcRequest( + "getBlocks", + listOf( + slot - 10, + slot, + ), + ), + ) + } + .flatMap(JsonRpcResponse::requireResult) + .flatMap { + val response = Global.objectMapper.readValue(it, LongArray::class.java) + if (response == null || response.isEmpty()) { + Mono.empty() + } else { + val maxSlot = response.max() + reader.read( + JsonRpcRequest( + "getBlock", + listOf( + maxSlot, + mapOf( + "showRewards" to false, + "transactionDetails" to "none", + "maxSupportedTransactionVersion" to 0, + ), + ), + ), + ) + .flatMap(JsonRpcResponse::requireResult) + .map { blockData -> + val block = Global.objectMapper.readValue(blockData, SolanaBlock::class.java) + LowerBlockData(block.height, maxSlot) + }.onErrorResume { + Mono.empty() + } + } + } + .onErrorResume { + Mono.empty() + } + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt index cad631dbf..8cc99e1ae 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt @@ -8,6 +8,7 @@ import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.data.BlockId import io.emeraldpay.dshackle.foundation.ChainOptions.Options +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector import io.emeraldpay.dshackle.upstream.SingleCallValidator import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability @@ -69,6 +70,10 @@ object StarknetChainSpecific : AbstractPollChainSpecific() { ) } + override fun lowerBoundBlockDetector(chain: Chain, upstream: Upstream): LowerBoundBlockDetector { + return StarknetLowerBoundBlockDetector(chain, upstream) + } + fun validate(data: ByteArray, lagging: Int, upstreamId: String): UpstreamAvailability { val resp = Global.objectMapper.readValue(data, StarknetSyncing::class.java) return if (resp.highest - resp.current > lagging) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundBlockDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundBlockDetector.kt new file mode 100644 index 000000000..4f3d19412 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundBlockDetector.kt @@ -0,0 +1,17 @@ +package io.emeraldpay.dshackle.upstream.starknet + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector +import io.emeraldpay.dshackle.upstream.Upstream +import reactor.core.publisher.Mono + +class StarknetLowerBoundBlockDetector( + chain: Chain, + upstream: Upstream, +) : LowerBoundBlockDetector(chain, upstream) { + + // for starknet we assume that all nodes are archive + override fun lowerBlockDetect(): Mono { + return Mono.just(LowerBlockData(1)) + } +} diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy index 1ce79a88b..40cc0b05e 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy @@ -20,8 +20,10 @@ import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.reader.Reader import io.emeraldpay.dshackle.startup.QuorumForLabels +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.calls.* import io.emeraldpay.dshackle.upstream.generic.GenericUpstream @@ -29,7 +31,6 @@ import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import org.jetbrains.annotations.NotNull import org.reactivestreams.Publisher -import io.emeraldpay.dshackle.foundation.ChainOptions class GenericUpstreamMock extends GenericUpstream { EthereumHeadMock ethereumHeadMock @@ -74,6 +75,7 @@ class GenericUpstreamMock extends GenericUpstream { new ConnectorFactoryMock(api, new EthereumHeadMock()), io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific.INSTANCE.&validator, io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific.INSTANCE.&labelDetector, + io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific.INSTANCE.&lowerBoundBlockDetector, ) this.ethereumHeadMock = this.getHead() as EthereumHeadMock setLag(0) @@ -104,4 +106,9 @@ class GenericUpstreamMock extends GenericUpstream { String toString() { return "Upstream mock ${getId()}" } + + @Override + LowerBoundBlockDetector.LowerBlockData getLowerBlock() { + return new LowerBoundBlockDetector.LowerBlockData(0, 0) + } } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy index a3db552ce..f3d0dbb29 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy @@ -79,6 +79,7 @@ class FilteredApisSpec extends Specification { connectorFactory, cs.&validator, cs.&labelDetector, + cs.&lowerBoundBlockDetector ) } def matcher = new Selector.LabelMatcher("test", ["foo"]) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundBlockDetectorTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundBlockDetectorTest.kt new file mode 100644 index 000000000..63a2e72f6 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundBlockDetectorTest.kt @@ -0,0 +1,146 @@ +package io.emeraldpay.dshackle.upstream + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.reader.JsonRpcReader +import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundBlockDetector +import io.emeraldpay.dshackle.upstream.polkadot.PolkadotLowerBoundBlockDetector +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.kotlin.any +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import reactor.core.publisher.Mono +import reactor.test.StepVerifier +import java.time.Duration + +class RecursiveLowerBoundBlockDetectorTest { + + @ParameterizedTest + @MethodSource("detectors") + fun `find lower block closer to the height`( + reader: JsonRpcReader, + detectorClass: Class, + ) { + val head = mock { + on { getCurrentHeight() } doReturn 18000000 + } + val upstream = mock { + on { getHead() } doReturn head + on { getIngressReader() } doReturn reader + } + + val detector = detectorClass.getConstructor(Chain::class.java, Upstream::class.java).newInstance(Chain.UNSPECIFIED, upstream) + + StepVerifier.withVirtualTime { detector.lowerBlock() } + .expectSubscription() + .expectNoEvent(Duration.ofSeconds(15)) + .expectNext(LowerBoundBlockDetector.LowerBlockData(17964844L)) + .thenCancel() + .verify(Duration.ofSeconds(3)) + + Assertions.assertEquals(LowerBoundBlockDetector.LowerBlockData(17964844L), detector.getCurrentLowerBlock()) + } + + @ParameterizedTest + @MethodSource("detectorsFirstBlock") + fun `lower block is 0x1`( + reader: JsonRpcReader, + detectorClass: Class, + ) { + val head = mock { + on { getCurrentHeight() } doReturn 18000000 + } + val upstream = mock { + on { getHead() } doReturn head + on { getIngressReader() } doReturn reader + } + + val detector = detectorClass.getConstructor(Chain::class.java, Upstream::class.java).newInstance(Chain.UNSPECIFIED, upstream) + + StepVerifier.withVirtualTime { detector.lowerBlock() } + .expectSubscription() + .expectNoEvent(Duration.ofSeconds(15)) + .expectNext(LowerBoundBlockDetector.LowerBlockData(1)) + .thenCancel() + .verify(Duration.ofSeconds(3)) + + Assertions.assertEquals(LowerBoundBlockDetector.LowerBlockData(1), detector.getCurrentLowerBlock()) + } + + companion object { + private val blocks = listOf( + 9000000L, 13500000L, 15750000L, 16875000L, 17437500L, 17718750L, 17859375L, 17929688L, 17964844L, 17947266L, + 17956055L, 17960449L, 17962646L, 17963745L, 17964294L, 17964569L, 17964706L, 17964775L, 17964809L, 17964826L, + 17964835L, 17964839L, 17964841L, 17964842L, 17964843L, + ) + private const val hash1 = "0x1b1a5dd69e12aa12e2b9197be0d0cceef3dde6368ea6376ad7c8b06488c9cf6a" + private const val hash2 = "0x1b1a5dd69e12aa12e2b9197be0d0cceef3dde6368ea6376ad7c8b06488c9cf7a" + + @JvmStatic + fun detectors(): List = listOf( + Arguments.of( + mock { + blocks.forEach { + if (it == 17964844L) { + on { + read(JsonRpcRequest("eth_getBalance", listOf("0x756F45E3FA69347A9A973A725E3C98bC4db0b5a0", it.toHex()))) + } doReturn Mono.just(JsonRpcResponse(ByteArray(0), null)) + } else { + on { + read(JsonRpcRequest("eth_getBalance", listOf("0x756F45E3FA69347A9A973A725E3C98bC4db0b5a0", it.toHex()))) + } doReturn Mono.error(RuntimeException()) + } + } + }, + EthereumLowerBoundBlockDetector::class.java, + ), + Arguments.of( + mock { + blocks.forEach { + if (it == 17964844L) { + on { + read(JsonRpcRequest("chain_getBlockHash", listOf(it.toHex()))) + } doReturn Mono.just(JsonRpcResponse("\"$hash1\"".toByteArray(), null)) + on { + read(JsonRpcRequest("state_getMetadata", listOf(hash1))) + } doReturn Mono.just(JsonRpcResponse(ByteArray(0), null)) + } else { + on { + read(JsonRpcRequest("chain_getBlockHash", listOf(it.toHex()))) + } doReturn Mono.just(JsonRpcResponse("\"$hash2\"".toByteArray(), null)) + on { + read(JsonRpcRequest("state_getMetadata", listOf(hash2))) + } doReturn Mono.error(RuntimeException()) + } + } + }, + PolkadotLowerBoundBlockDetector::class.java, + ), + ) + + @JvmStatic + fun detectorsFirstBlock(): List = listOf( + Arguments.of( + mock { + on { + read(JsonRpcRequest("chain_getBlockHash", listOf(any()))) + } doReturn Mono.just(JsonRpcResponse(ByteArray(0), null)) + on { + read(JsonRpcRequest("state_getMetadata", listOf(any()))) + } doReturn Mono.just(JsonRpcResponse(ByteArray(0), null)) + }, + PolkadotLowerBoundBlockDetector::class.java, + ), + Arguments.of( + mock { + on { read(any()) } doReturn Mono.just(JsonRpcResponse(ByteArray(0), null)) + }, + EthereumLowerBoundBlockDetector::class.java, + ), + ) + } +} diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundBlockDetectorTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundBlockDetectorTest.kt new file mode 100644 index 000000000..b6646a58c --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundBlockDetectorTest.kt @@ -0,0 +1,70 @@ +package io.emeraldpay.dshackle.upstream.solana + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.reader.JsonRpcReader +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import reactor.core.publisher.Mono +import reactor.test.StepVerifier +import java.time.Duration + +class SolanaLowerBoundBlockDetectorTest { + + @Test + fun `get solana lower block and slot`() { + val reader = mock { + on { read(JsonRpcRequest("getFirstAvailableBlock", listOf())) } doReturn + Mono.just(JsonRpcResponse("25000000".toByteArray(), null)) + on { read(JsonRpcRequest("getBlocks", listOf(24999990L, 25000000L))) } doReturn + Mono.just(JsonRpcResponse("[23000000, 23000005, 23000010]".toByteArray(), null)) + on { + read( + JsonRpcRequest( + "getBlock", + listOf( + 23000010L, + mapOf( + "showRewards" to false, + "transactionDetails" to "none", + "maxSupportedTransactionVersion" to 0, + ), + ), + ), + ) + } doReturn Mono.just( + JsonRpcResponse( + Global.objectMapper.writeValueAsBytes( + mapOf( + "blockHeight" to 21000000, + "blockTime" to 111, + "blockhash" to "22", + "previousBlockhash" to "33", + ), + ), + null, + ), + ) + } + val upstream = mock { + on { getIngressReader() } doReturn reader + } + + val detector = SolanaLowerBoundBlockDetector(Chain.UNSPECIFIED, upstream) + + StepVerifier.withVirtualTime { detector.lowerBlock() } + .expectSubscription() + .expectNoEvent(Duration.ofSeconds(15)) + .expectNext(LowerBoundBlockDetector.LowerBlockData(21000000, 23000010)) + .thenCancel() + .verify(Duration.ofSeconds(3)) + + Assertions.assertEquals(LowerBoundBlockDetector.LowerBlockData(21000000, 23000010), detector.getCurrentLowerBlock()) + } +} diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundBlockDetectorTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundBlockDetectorTest.kt new file mode 100644 index 000000000..456b1452f --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundBlockDetectorTest.kt @@ -0,0 +1,24 @@ +package io.emeraldpay.dshackle.upstream.starknet + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector +import io.emeraldpay.dshackle.upstream.Upstream +import org.junit.jupiter.api.Test +import org.mockito.kotlin.mock +import reactor.test.StepVerifier +import java.time.Duration + +class StarknetLowerBoundBlockDetectorTest { + + @Test + fun `starknet lower block is 1`() { + val detector = StarknetLowerBoundBlockDetector(Chain.UNSPECIFIED, mock()) + + StepVerifier.withVirtualTime { detector.lowerBlock() } + .expectSubscription() + .expectNoEvent(Duration.ofSeconds(15)) + .expectNext(LowerBoundBlockDetector.LowerBlockData(1)) + .thenCancel() + .verify(Duration.ofSeconds(3)) + } +}