Skip to content

Commit

Permalink
Add determination of the lower block (#372)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Dec 26, 2023
1 parent e79df25 commit eb774d7
Show file tree
Hide file tree
Showing 26 changed files with 607 additions and 9 deletions.
2 changes: 1 addition & 1 deletion emerald-grpc
2 changes: 2 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/Describe.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class Describe(
.addAllSupportedSubscriptions(chainUpstreams.getEgressSubscription().getAvailableTopics())
.setStatus(status)
.setCurrentHeight(chainUpstreams.getHead().getCurrentHeight() ?: 0)
.setCurrentLowerBlock(chainUpstreams.getLowerBlock().blockNumber)
.setCurrentLowerSlot(chainUpstreams.getLowerBlock().slot ?: 0)
chainUpstreams.getQuorumLabels()
.forEach { node ->
val nodeDetails = BlockchainOuterClass.NodeDetails.newBuilder()
Expand Down
10 changes: 7 additions & 3 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/StreamHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.api.proto.Common
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.MultistreamHolder
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
Expand All @@ -39,20 +40,23 @@ class StreamHead(
return requestMono.map { request ->
Chain.byId(request.type.number)
}.flatMapMany { chain ->
multistreamHolder.getUpstream(chain).getHead()
val ms = multistreamHolder.getUpstream(chain)
ms.getHead()
.getFlux()
.map { asProto(chain, it!!) }
.map { asProto(ms, chain, it!!) }
.onErrorContinue { t, _ ->
log.warn("Head subscription error", t)
}
}
}

fun asProto(chain: Chain, block: BlockContainer): BlockchainOuterClass.ChainHead {
fun asProto(ms: Multistream, chain: Chain, block: BlockContainer): BlockchainOuterClass.ChainHead {
return BlockchainOuterClass.ChainHead.newBuilder()
.setChainValue(chain.id)
.setHeight(block.height)
.setSlot(block.slot)
.setCurrentLowerBlock(ms.getLowerBlock().blockNumber)
.setCurrentLowerSlot(ms.getLowerBlock().slot ?: 0)
.setTimestamp(block.timestamp.toEpochMilli())
.setWeight(ByteString.copyFrom(block.difficulty.toByteArray()))
.setBlockId(block.hash.toHex())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ open class GenericUpstreamCreator(
connectorFactory,
cs::validator,
cs::labelDetector,
cs::lowerBoundBlockDetector,
)

upstream.start()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.dshackle.Chain
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import java.util.concurrent.atomic.AtomicReference

typealias LowerBoundBlockDetectorBuilder = (Chain, Upstream) -> LowerBoundBlockDetector

fun Long.toHex() = "0x${this.toString(16)}"

abstract class LowerBoundBlockDetector(
private val chain: Chain,
private val upstream: Upstream,
) {
private val currentLowerBlock = AtomicReference(LowerBlockData.default())

private val log = LoggerFactory.getLogger(this::class.java)

fun lowerBlock(): Flux<LowerBlockData> {
return Flux.interval(
Duration.ofSeconds(15),
Duration.ofSeconds(60),
)
.flatMap { lowerBlockDetect() }
.filter { it.blockNumber > currentLowerBlock.get().blockNumber }
.map {
log.info("Lower block of ${upstream.getId()} $chain: block height - {}, slot - {}", it.blockNumber, it.slot ?: "NA")

currentLowerBlock.set(it)
it
}
}

fun getCurrentLowerBlock(): LowerBlockData = currentLowerBlock.get()

protected abstract fun lowerBlockDetect(): Mono<LowerBlockData>

data class LowerBlockData(
val blockNumber: Long,
val slot: Long?,
) : Comparable<LowerBlockData> {
constructor(blockNumber: Long) : this(blockNumber, null)

companion object {
fun default() = LowerBlockData(0, 0)
}

override fun compareTo(other: LowerBlockData): Int {
return this.blockNumber.compareTo(other.blockNumber)
}
}

data class LowerBoundData(
val left: Long,
val right: Long,
val current: Long,
val found: Boolean,
) {
constructor(left: Long, right: Long) : this(left, right, 0, false)

constructor(left: Long, right: Long, current: Long) : this(left, right, current, false)

constructor(current: Long, found: Boolean) : this(0, 0, current, found)
}
}
16 changes: 12 additions & 4 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ abstract class Multistream(
@Volatile
private var capabilities: Set<Capability> = emptySet()

@Volatile
private var lowerBlock: LowerBoundBlockDetector.LowerBlockData = LowerBoundBlockDetector.LowerBlockData.default()

@Volatile
private var quorumLabels: List<QuorumForLabels.QuorumItem>? = null
private val meters: MutableMap<String, List<Meter.Id>> = HashMap()
Expand Down Expand Up @@ -227,6 +230,10 @@ abstract class Multistream(
}
}
quorumLabels = getQuorumLabels(availableUpstreams)
availableUpstreams
.filter { it.getLowerBlock() != LowerBoundBlockDetector.LowerBlockData.default() }
.minOfOrNull { it.getLowerBlock() }
?.let { lowerBlock = it }
when {
upstreams.size == 1 -> {
lagObserver?.stop()
Expand Down Expand Up @@ -313,11 +320,11 @@ abstract class Multistream(
started = true
}

override fun getLowerBlock(): LowerBoundBlockDetector.LowerBlockData = lowerBlock

private fun observeUpstreamsStatuses() {
subscribeAddedUpstreams()
.distinctUntilChanged {
it.getId()
}.flatMap { upstream ->
.flatMap { upstream ->
val statusStream = upstream.observeStatus()
.map { UpstreamChangeEvent(this.chain, upstream, UpstreamChangeEvent.ChangeType.UPDATED) }
val stateStream = upstream.observeState()
Expand Down Expand Up @@ -407,9 +414,10 @@ abstract class Multistream(
val weak = getUpstreams()
.filter { it.getStatus() != UpstreamAvailability.OK }
.joinToString(", ") { it.getId() }
val lowerBlockData = "[height=${lowerBlock.blockNumber}, slot=${lowerBlock.slot ?: "NA"}]"

val instance = System.identityHashCode(this).toString(16)
log.info("State of ${chain.chainCode}: height=${height ?: '?'}, status=[$statuses], lag=[$lag], weak=[$weak] ($instance)")
log.info("State of ${chain.chainCode}: height=${height ?: '?'}, status=[$statuses], lag=[$lag], lower block=$lowerBlockData, weak=[$weak] ($instance)")
}

fun test(event: UpstreamChangeEvent): Boolean {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.dshackle.Chain
import reactor.core.publisher.Mono

abstract class RecursiveLowerBoundBlockDetector(
chain: Chain,
private val upstream: Upstream,
) : LowerBoundBlockDetector(chain, upstream) {

override fun lowerBlockDetect(): Mono<LowerBlockData> {
return Mono.just(upstream.getHead())
.flatMap {
val currentHeight = it.getCurrentHeight()
if (currentHeight == null) {
Mono.empty()
} else {
Mono.just(LowerBoundData(0, currentHeight))
}
}
.expand { data ->
if (data.found) {
Mono.empty()
} else {
val middle = middleBlock(data)

if (data.left > data.right) {
val current = if (data.current == 0L) 1 else data.current
Mono.just(LowerBoundData(current, true))
} else {
hasState(middle)
.map {
if (it) {
LowerBoundData(data.left, middle - 1, middle)
} else {
LowerBoundData(middle + 1, data.right, data.current)
}
}
}
}
}
.filter { it.found }
.next()
.map {
LowerBlockData(it.current)
}
}

private fun middleBlock(lowerBoundData: LowerBoundData): Long =
lowerBoundData.left + (lowerBoundData.right - lowerBoundData.left) / 2

protected abstract fun hasState(blockNumber: Long): Mono<Boolean>
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ interface Upstream : Lifecycle {
fun getId(): String
fun getCapabilities(): Set<Capability>
fun isGrpc(): Boolean
fun getLowerBlock(): LowerBoundBlockDetector.LowerBlockData

fun <T : Upstream> cast(selfType: Class<T>): T

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.upstream.Capability
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.calls.CallMethods
Expand Down Expand Up @@ -71,6 +72,10 @@ open class BitcoinRpcUpstream(
return false
}

override fun getLowerBlock(): LowerBoundBlockDetector.LowerBlockData {
return LowerBoundBlockDetector.LowerBlockData.default()
}

@Suppress("UNCHECKED_CAST")
override fun <T : Upstream> cast(selfType: Class<T>): T {
if (!selfType.isAssignableFrom(this.javaClass)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.LabelsDetector
import io.emeraldpay.dshackle.upstream.LogsOracle
import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamValidator
Expand Down Expand Up @@ -88,6 +89,10 @@ object EthereumChainSpecific : AbstractPollChainSpecific() {
return EthereumUpstreamValidator(chain, upstream, options, config)
}

override fun lowerBoundBlockDetector(chain: Chain, upstream: Upstream): LowerBoundBlockDetector {
return EthereumLowerBoundBlockDetector(chain, upstream)
}

override fun labelDetector(chain: Chain, reader: JsonRpcReader): LabelsDetector {
return EthereumLabelsDetector(reader, chain)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.upstream.RecursiveLowerBoundBlockDetector
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import io.emeraldpay.dshackle.upstream.toHex
import reactor.core.publisher.Mono

class EthereumLowerBoundBlockDetector(
chain: Chain,
private val upstream: Upstream,
) : RecursiveLowerBoundBlockDetector(chain, upstream) {

override fun hasState(blockNumber: Long): Mono<Boolean> {
return upstream.getIngressReader().read(
JsonRpcRequest(
"eth_getBalance",
listOf("0x756F45E3FA69347A9A973A725E3C98bC4db0b5a0", blockNumber.toHex()),
),
)
.flatMap(JsonRpcResponse::requireResult)
.map { true }
.onErrorReturn(false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.LabelsDetector
import io.emeraldpay.dshackle.upstream.LogsOracle
import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamValidator
Expand Down Expand Up @@ -60,6 +61,8 @@ interface ChainSpecific {
fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription

fun callSelector(caches: Caches): CallSelector?

fun lowerBoundBlockDetector(chain: Chain, upstream: Upstream): LowerBoundBlockDetector
}

object ChainSpecificRegistry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.LabelsDetectorBuilder
import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector
import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetectorBuilder
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.UpstreamValidator
Expand Down Expand Up @@ -40,6 +42,7 @@ open class GenericUpstream(
connectorFactory: ConnectorFactory,
validatorBuilder: UpstreamValidatorBuilder,
labelsDetectorBuilder: LabelsDetectorBuilder,
lowerBoundBlockDetectorBuilder: LowerBoundBlockDetectorBuilder,
) : DefaultUpstream(id, hash, null, UpstreamAvailability.OK, options, role, targets, node, chainConfig), Lifecycle {

private val validator: UpstreamValidator? = validatorBuilder(chain, this, getOptions(), chainConfig)
Expand All @@ -51,6 +54,8 @@ open class GenericUpstream(
private var livenessSubscription: Disposable? = null
private val labelsDetector = labelsDetectorBuilder(chain, this.getIngressReader())

private val lowerBoundBlockDetector = lowerBoundBlockDetectorBuilder(chain, this)

override fun getHead(): Head {
return connector.getHead()
}
Expand All @@ -77,6 +82,10 @@ open class GenericUpstream(
return false
}

override fun getLowerBlock(): LowerBoundBlockDetector.LowerBlockData {
return lowerBoundBlockDetector.getCurrentLowerBlock()
}

@Suppress("UNCHECKED_CAST")
override fun <T : Upstream> cast(selfType: Class<T>): T {
if (!selfType.isAssignableFrom(this.javaClass)) {
Expand Down Expand Up @@ -162,6 +171,8 @@ open class GenericUpstream(
log.debug("Error while checking live subscription for ${getId()}", it)
},)
detectLabels()

detectLowerBlock()
}

override fun stop() {
Expand All @@ -185,6 +196,15 @@ open class GenericUpstream(
}
}

private fun detectLowerBlock() {
lowerBoundBlockDetector.lowerBlock()
.subscribe {
stateEventStream.emitNext(
UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.UPDATED),
) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
}
}

fun getIngressSubscription(): IngressSubscription {
return connector.getIngressSubscription()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import io.emeraldpay.dshackle.upstream.BuildInfo
import io.emeraldpay.dshackle.upstream.Capability
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.LowerBoundBlockDetector
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinUpstream
Expand Down Expand Up @@ -148,6 +149,10 @@ class BitcoinGrpcUpstream(
return true
}

override fun getLowerBlock(): LowerBoundBlockDetector.LowerBlockData {
return LowerBoundBlockDetector.LowerBlockData.default()
}

@Suppress("UNCHECKED_CAST")
override fun <T : Upstream> cast(selfType: Class<T>): T {
if (!selfType.isAssignableFrom(this.javaClass)) {
Expand Down
Loading

0 comments on commit eb774d7

Please sign in to comment.