Skip to content

Commit

Permalink
#7 Add event flow
Browse files Browse the repository at this point in the history
  • Loading branch information
Heonbyeong committed Jul 12, 2022
1 parent 1a17714 commit 198d710
Showing 1 changed file with 53 additions and 0 deletions.
53 changes: 53 additions & 0 deletions presentation/src/main/java/com/knowing/draven/util/EventFlow.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.knowing.draven.util

import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicBoolean

interface EventFlow<out T> : Flow<T> {

companion object {
const val DEFAULT_REPLAY: Int = 3
}
}

interface MutableEventFlow<T> : EventFlow<T>, FlowCollector<T>

@Suppress("FunctionName")
fun <T> MutableEventFlow(
replay: Int = EventFlow.DEFAULT_REPLAY
): MutableEventFlow<T> = EventFlowImpl(replay)

fun <T> MutableEventFlow<T>.asEventFlow(): EventFlow<T> = ReadOnlyEventFlow(this)

private class ReadOnlyEventFlow<T>(flow: EventFlow<T>) : EventFlow<T> by flow

private class EventFlowImpl<T>(
replay: Int
) : MutableEventFlow<T> {

private val flow: MutableSharedFlow<EventFlowSlot<T>> = MutableSharedFlow(replay = replay)

@InternalCoroutinesApi
override suspend fun collect(collector: FlowCollector<T>) = flow
.collect { slot ->
if(!slot.markConsumed()) {
collector.emit(slot.value)
}
}

override suspend fun emit(value: T) {
flow.emit(EventFlowSlot(value))
}

private class EventFlowSlot<T>(val value: T) {

private val consumed: AtomicBoolean = AtomicBoolean(false)

fun markConsumed(): Boolean = consumed.getAndSet(true)
}

}

0 comments on commit 198d710

Please sign in to comment.