diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericEgressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericEgressSubscription.kt index de1363e25..06fca1fe3 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericEgressSubscription.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericEgressSubscription.kt @@ -9,10 +9,11 @@ import reactor.core.scheduler.Scheduler class GenericEgressSubscription( val multistream: Multistream, val scheduler: Scheduler, - val methods: List, ) : EgressSubscription { override fun getAvailableTopics(): List { - return methods + return multistream.getUpstreams() + .flatMap { (it as GenericUpstream).getIngressSubscription().getAvailableTopics() } + .distinct() } override fun subscribe(topic: String, params: Any?, matcher: Matcher): Flux { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt index d3abbde99..6b9937467 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt @@ -10,16 +10,22 @@ import reactor.core.publisher.Mono import java.time.Duration import java.util.concurrent.ConcurrentHashMap -class GenericIngressSubscription(val conn: WsSubscriptions) : IngressSubscription { +class GenericIngressSubscription(val conn: WsSubscriptions, val methods: List) : IngressSubscription { override fun getAvailableTopics(): List { - return emptyList() // not used now + return methods } private val holders = ConcurrentHashMap, SubscriptionConnect>() @Suppress("UNCHECKED_CAST") override fun get(topic: String, params: Any?): SubscriptionConnect { - return holders.computeIfAbsent(topic to params, { key -> GenericSubscriptionConnect(conn, key.first, key.second) }) as SubscriptionConnect + return holders.computeIfAbsent(topic to params) { key -> + GenericSubscriptionConnect( + conn, + key.first, + key.second, + ) + } as SubscriptionConnect } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt index 4a929d43f..3c236de1d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt @@ -83,7 +83,7 @@ object PolkadotChainSpecific : AbstractPollChainSpecific() { } override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription { - return { ms -> GenericEgressSubscription(ms, headScheduler, DefaultPolkadotMethods.subs.map { it.first }) } + return { ms -> GenericEgressSubscription(ms, headScheduler) } } override fun validator( @@ -118,7 +118,7 @@ object PolkadotChainSpecific : AbstractPollChainSpecific() { } override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription { - return GenericIngressSubscription(ws) + return GenericIngressSubscription(ws, DefaultPolkadotMethods.subs.map { it.first }) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt index dd9d040ca..840c49424 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt @@ -141,11 +141,11 @@ object SolanaChainSpecific : AbstractChainSpecific() { } override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription { - return GenericIngressSubscription(ws) + return GenericIngressSubscription(ws, DefaultSolanaMethods.subs.map { it.first }) } override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription { - return { ms -> GenericEgressSubscription(ms, headScheduler, DefaultSolanaMethods.subs.map { it.first }) } + return { ms -> GenericEgressSubscription(ms, headScheduler) } } }