Skip to content

Commit

Permalink
report only available subscription in generic case
Browse files Browse the repository at this point in the history
  • Loading branch information
a10zn8 committed Nov 27, 2023
1 parent db405a8 commit 9d06f6f
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import reactor.core.scheduler.Scheduler
class GenericEgressSubscription(
val multistream: Multistream,
val scheduler: Scheduler,
val methods: List<String>,
) : EgressSubscription {
override fun getAvailableTopics(): List<String> {
return methods
return multistream.getUpstreams()
.flatMap { (it as GenericUpstream).getIngressSubscription().getAvailableTopics() }
.distinct()
}

override fun subscribe(topic: String, params: Any?, matcher: Matcher): Flux<ByteArray> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,22 @@ import reactor.core.publisher.Mono
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap

class GenericIngressSubscription(val conn: WsSubscriptions) : IngressSubscription {
class GenericIngressSubscription(val conn: WsSubscriptions, val methods: List<String>) : IngressSubscription {
override fun getAvailableTopics(): List<String> {
return emptyList() // not used now
return methods
}

private val holders = ConcurrentHashMap<Pair<String, Any?>, SubscriptionConnect<out Any>>()

@Suppress("UNCHECKED_CAST")
override fun <T> get(topic: String, params: Any?): SubscriptionConnect<T> {
return holders.computeIfAbsent(topic to params, { key -> GenericSubscriptionConnect(conn, key.first, key.second) }) as SubscriptionConnect<T>
return holders.computeIfAbsent(topic to params) { key ->
GenericSubscriptionConnect(
conn,
key.first,
key.second,
)
} as SubscriptionConnect<T>
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ object PolkadotChainSpecific : AbstractPollChainSpecific() {
}

override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription {
return { ms -> GenericEgressSubscription(ms, headScheduler, DefaultPolkadotMethods.subs.map { it.first }) }
return { ms -> GenericEgressSubscription(ms, headScheduler) }
}

override fun validator(
Expand Down Expand Up @@ -118,7 +118,7 @@ object PolkadotChainSpecific : AbstractPollChainSpecific() {
}

override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription {
return GenericIngressSubscription(ws)
return GenericIngressSubscription(ws, DefaultPolkadotMethods.subs.map { it.first })
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ object SolanaChainSpecific : AbstractChainSpecific() {
}

override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription {
return GenericIngressSubscription(ws)
return GenericIngressSubscription(ws, DefaultSolanaMethods.subs.map { it.first })
}

override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription {
return { ms -> GenericEgressSubscription(ms, headScheduler, DefaultSolanaMethods.subs.map { it.first }) }
return { ms -> GenericEgressSubscription(ms, headScheduler) }
}
}

Expand Down

0 comments on commit 9d06f6f

Please sign in to comment.