Understanding the internal of Flow, StateFlow, and SharedFlow

In this article, you'll dive deep into the internal mechanisms of Flow, StateFlow, and SharedFlow, exploring how they work under the hood.

Jaewoong Eum
PublishedLast updated

Kotlin Coroutines introduced Flow as a reactive stream abstraction, fundamentally changing how Android developers handle asynchronous data streams. While LiveData served the Android community for years, Flow emerged as a more flexible, and coroutine-native solution. But the power of Flow goes far beyond its clean API, it’s in the sophisticated internal machinery that makes it both performant and safe.

In this article, you’ll dive deep into the internal mechanisms of Flow, StateFlow, and SharedFlow, exploring how they work under the hood, the engineering decisions that power them, and the subtle optimizations that make them production-ready.

Understanding the core abstraction: What makes Flow special

At its heart, Flow is an asynchronous data stream that sequentially emits values. But what distinguishes Flow from other reactive streams is its adherence to two fundamental properties: context preservation and exception transparency.

Context preservation means a flow encapsulates its own execution context and never leaks it downstream. You can’t accidentally emit from the wrong dispatcher.

1val myFlow = flow {
2// withContext(Dispatchers.IO) { // This would throw IllegalStateException
3    emit(1)// OK - emits in the collector's context
4    coroutineScope {
5        emit(2)// OK - still the same coroutine context
6    }
7}

Exception transparency ensures that when an emission fails, the flow immediately stops and propagates the exception. This makes flows predictable and composable, you can reason about error handling locally without worrying about upstream flows swallowing exceptions.

These properties are not just guidelines, they’re enforced by the runtime. The Flow API is carefully designed to prevent violations, and attempting to break these rules results in exception. For example, if you’re trying to collect the Flow above, you’ll face “java.lang.IllegalStateException: Flow invariant is violated” exception.

The Flow interface and AbstractFlow implementation

If you examine the Flow interface, it’s pretty minimal:

1public interface Flow<out T> {
2    public suspend fun collect(collector: FlowCollector<T>)
3}

A single suspending function. That’s it. This simplicity is deliberate, Flow is cold by nature, meaning each collection creates a new execution. The complexity lies in how this simple interface is implemented.

The recommended way to implement Flow is through AbstractFlow, which provides critical safety guardrails:

1@ExperimentalCoroutinesApi
2public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
3    public final override suspend fun collect(collector: FlowCollector<T>) {
4        val safeCollector = SafeCollector(collector, coroutineContext)
5        try {
6            collectSafely(safeCollector)
7        } finally {
8            safeCollector.releaseIntercepted()
9        }
10    }
11
12    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
13}

The AbstractFlow implementation wraps every collector with a SafeCollector. This wrapper is the enforcer of context preservation, it tracks the coroutine context and throws IllegalStateException if you try to emit from a different context. This is how the flow builder prevents withContext calls during emission.

When you call emit() on a SafeCollector, it performs a context check before allowing the emission like the internal codes below:

1// Inside SafeCollector implementationpublic override suspend fun emit(value: T) {
2    return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
3        try {
4            emit(uCont, value)
5        } catch (e: Throwable) {
6// save the fact that exception was thrown
7            lastEmissionContext = DownstreamExceptionContext(e, uCont.context)
8            throw e
9        }
10    }
11}

The SafeCollector compares the current coroutine context with the collector’s context. If they don’t match, it knows you’ve switched contexts illegally and throws an exception. This safety is what makes Flow‘s context preservation guarantee possible.

Hot vs Cold: SharedFlow enters the picture

While regular Flow is cold (each collector triggers independent execution), many use cases require hot flows, flows that emit values regardless of whether anyone is collecting. This is where SharedFlow comes in.

SharedFlow is fundamentally different from Flow. It’s a broadcast mechanism that shares emitted values among all active collectors. Think of it like a radio station, it broadcasts whether anyone is listening or not (though its behavior adapts based on whether there are subscribers).

The SharedFlow architecture

If you examine the SharedFlow interface, you’ll notice it extends Flow:

1public interface SharedFlow<out T> : Flow<T> {
2    public val replayCache: List<T>
3    override suspend fun collect(collector: FlowCollector<T>): Nothing
4}

The key difference? The collect function’s return type is Nothing. This signals that SharedFlow never completes. A call to collect() on a SharedFlow will run forever unless cancelled. This is the essence of hot flows, they’re always active.

The internal buffer architecture

The real sophistication lies in SharedFlowImpl, which manages a complex circular buffer to handle replay cache, buffering, and queued emitters. Let’s examine its structure:

1// codes are from the kotlinx.coroutines
2internal open class SharedFlowImpl<T>(
3    private val replay: Int,
4    private val bufferCapacity: Int,
5    private val onBufferOverflow: BufferOverflow
6) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
7/*
8        Logical structure of the buffer
9
10                  buffered values
11             /-----------------------\
12                          replayCache      queued emitters
13                          /----------\/----------------------\
14         +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
15         |   | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E |   |   |   |
16         +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
17               ^           ^           ^                      ^
18               |           |           |                      |
19              head         |      head + bufferSize     head + totalSize
20               |           |           |
21     index of the slowest  |    index of the fastest
22      possible collector   |     possible collector
23               |           |
24               |     replayIndex == new collector's index
25               \---------------------- /
26          range of possible minCollectorIndex
27
28          head == minOf(minCollectorIndex, replayIndex) // by definition
29          totalSize == bufferSize + queueSize // by definition
30
31       INVARIANTS:
32          minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize)
33          replayIndex <= head + bufferSize
34     */private var buffer: Array<Any?>? = null// allocated when needed, always power of twoprivate var replayIndex = 0L// minimal index from which new collector gets valuesprivate var minCollectorIndex = 0L// minimal index of active collectorsprivate var bufferSize = 0// number of buffered valuesprivate var queueSize = 0// number of queued emitters

This comment from the coroutines library is a goldmine of implementation detail. The buffer is a circular array that logically contains three regions:

  1. Replay cache (from replayIndex to head + bufferSize): Values that new collectors immediately receive
  2. Buffered values (from head to head + bufferSize): Values available to collectors
  3. Queued emitters (from head + bufferSize to head + totalSize): Suspended emitters waiting for slow collectors

The circular buffer optimization is important for performance. Instead of shifting elements when the head moves forward, the implementation uses modulo arithmetic to wrap around:

1private fun Array<Any?>.getBufferAt(index: Long) = get(index.toInt() and (size - 1))
2private fun Array<Any?>.setBufferAt(index: Long, item: Any?) = set(index.toInt() and (size - 1), item)

The and (size - 1) operation is a fast modulo for power-of-two sizes. This is why the buffer is always allocated in powers of two, it turns expensive division into a single bitwise AND.

The slot allocation system

Both StateFlow and SharedFlow use a slot allocation system to track active collectors. This is managed by AbstractSharedFlow:

1internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
2    protected var slots: Array<S?>? = null// allocated when neededprivate set
3    protected var nCollectors = 0// number of allocated (!free) slotsprivate set
4    private var nextIndex = 0// oracle for the next free slot index

When a collector starts, it allocates a slot:

1protected fun allocateSlot(): S {
2    val subscriptionCount: SubscriptionCountStateFlow?
3    val slot = synchronized(this) {
4        val slots = when (val curSlots = slots) {
5            null -> createSlotArray(2).also { slots = it }
6            else -> if (nCollectors >= curSlots.size) {
7                curSlots.copyOf(2 * curSlots.size).also { slots = it }
8            } else {
9                curSlots
10            }
11        }
12        var index = nextIndex
13        var slot: S
14        while (true) {
15            slot = slots[index] ?: createSlot().also { slots[index] = it }
16            index++
17            if (index >= slots.size) index = 0
18            if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break
19        }
20        nextIndex = index
21        nCollectors++
22        subscriptionCount = _subscriptionCount
23        slot
24    }
25    subscriptionCount?.increment(1)
26    return slot
27}

This allocation strategy has several clever optimizations:

Slot reuse: Slots are never destroyed, only marked as free. When a new collector arrives, it reuses an existing slot rather than allocating a new one. This eliminates allocation churn in scenarios where collectors come and go frequently.

Amortized O(1) allocation: The array doubles in size when full, giving amortized constant-time allocation just like ArrayList.

Predictable next slot search: The nextIndex oracle remembers where the last successful allocation happened, making the next search start from a likely free slot instead of always starting from zero.

On the other hand, for SharedFlow, each slot tracks the collector’s position in the buffer:

1internal class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
2    @JvmField
3    var index = -1L// current "to-be-emitted" index, -1 means free@JvmField
4    var cont: Continuation<Unit>? = null// collector waiting for new valueoverride fun allocateLocked(flow: SharedFlowImpl<*>): Boolean {
5        if (index >= 0) return false// not free
6        index = flow.updateNewCollectorIndexLocked()
7        return true
8    }
9}

The index field is pretty important here, it tracks where this collector is in the stream of values. When a new value is emitted, the SharedFlow checks all slots and resumes those whose index is behind the buffer.

The emission flow in SharedFlow

Now, let’s explore what happens when you emit a value to a SharedFlow with tryEmit():

1override fun tryEmit(value: T): Boolean {
2    var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
3    val emitted = synchronized(this) {
4        if (tryEmitLocked(value)) {
5            resumes = findSlotsToResumeLocked(resumes)
6            true
7        } else {
8            false
9        }
10    }
11    for (cont in resumes) cont?.resume(Unit)
12    return emitted
13}

The critical observation: emissions happen in two phases, and continuations are resumed outside the lock. This prevents deadlocks when using unconfined dispatchers and is a recurring pattern throughout the coroutines library.

Inside tryEmitLocked, the behavior depends on whether there are active collectors:

1private fun tryEmitLocked(value: T): Boolean {
2// Fast path without collectors -> no bufferingif (nCollectors == 0) return tryEmitNoCollectorsLocked(value)
3
4// With collectors we'll have to bufferif (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
5        when (onBufferOverflow) {
6            BufferOverflow.SUSPEND -> return false// will suspend
7            BufferOverflow.DROP_LATEST -> return true// just drop incoming
8            BufferOverflow.DROP_OLDEST -> {}// force enqueue & drop oldest
9        }
10    }
11    enqueueLocked(value)
12    bufferSize++
13    if (bufferSize > bufferCapacity) dropOldestLocked()
14    if (replaySize > replay) {
15        updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
16    }
17    return true
18}

When there are no collectors, SharedFlow takes a fast path:

1private fun tryEmitNoCollectorsLocked(value: T): Boolean {
2    assert { nCollectors == 0 }
3    if (replay == 0) return true// no need to replay, just forget it
4    enqueueLocked(value)
5    bufferSize++
6    if (bufferSize > replay) dropOldestLocked()
7    minCollectorIndex = head + bufferSize
8    return true
9}
10

This is an important optimization: when there are no subscribers, SharedFlow only maintains the replay cache. It doesn’t allocate extra buffer space, and buffer overflow strategies don’t apply, the behavior is effectively BufferOverflow.DROP_OLDEST with buffer size equal to replay.

When tryEmit returns false (buffer is full with SUSPEND strategy), emit() must suspend:

1private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
2    var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
3    val emitter = synchronized(this) lock@{
4        if (tryEmitLocked(value)) {
5            cont.resume(Unit)
6            resumes = findSlotsToResumeLocked(resumes)
7            return@lock null
8        }
9// add suspended emitter to the buffer
10        Emitter(this, head + totalSize, value, cont).also {
11            enqueueLocked(it)
12            queueSize++
13            if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
14        }
15    }
16    emitter?.let { cont.disposeOnCancellation(it) }
17    for (r in resumes) r?.resume(Unit)
18}
19

The suspended emitter is stored directly in the buffer as an Emitter object. When space becomes available (because a slow collector catches up or is cancelled), the SharedFlow will find these Emitter objects and resume them:

1for (curEmitterIndex in newBufferEndIndex until newQueueEndIndex) {
2    val emitter = buffer.getBufferAt(curEmitterIndex)
3    if (emitter !== NO_VALUE) {
4        emitter as Emitter
5        resumes[resumeCount++] = emitter.cont
6        buffer.setBufferAt(curEmitterIndex, NO_VALUE)
7        buffer.setBufferAt(newBufferEndIndex, emitter.value)
8        newBufferEndIndex++
9        if (resumeCount >= maxResumeCount) break
10    }
11}

This is great: queued emitters aren’t stored in a separate data structure. They live in the same circular buffer as values, marked by their type. This unifies the buffer management logic.

StateFlow: A specialized SharedFlow

StateFlow is essentially a SharedFlow with specific configuration and additional constraints. Looking at the constructor:

1public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)

The StateFlowImpl extends AbstractSharedFlow and implements both MutableStateFlow and SharedFlow:

1private class StateFlowImpl<T>(
2    initialState: Any// T | NULL
3) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
4    private val _state = atomic(initialState)
5    private var sequence = 0// serializes updates, value update in process when odd

The key difference from SharedFlow: StateFlow always has exactly one value, and it uses equality-based conflation. The _state is an atomic reference, and the sequence counter implements a clever synchronization protocol.

StateFlow‘s update mechanism is more complicated than a simple lock:

1private fun updateState(expectedState: Any?, newState: Any): Boolean {
2    var curSequence: Int
3    var curSlots: Array<StateFlowSlot?>?
4    synchronized(this) {
5        val oldState = _state.value
6        if (expectedState != null && oldState != expectedState) return false
7        if (oldState == newState) return true// conflation via equality
8        _state.value = newState
9        curSequence = sequence
10        if (curSequence and 1 == 0) {// even means quiescent
11            curSequence++// make it odd (update in progress)
12            sequence = curSequence
13        } else {
14// update already in progress, notify it
15            sequence = curSequence + 2// keep it oddreturn true
16        }
17        curSlots = slots
18    }
19// Resume collectors outside the lockwhile (true) {
20        curSlots?.forEach { it?.makePending() }
21        synchronized(this) {
22            if (sequence == curSequence) {
23                sequence = curSequence + 1// make even again (quiescent)return true
24            }
25            curSequence = sequence
26            curSlots = slots
27        }
28    }
29}

This is flat combining, a lock-free technique where the first updater takes responsibility for applying subsequent updates. Here’s how it works:

Even sequence means the StateFlow is quiescent (no update in progress). When an update arrives, it increments the sequence to make it odd, marking an update in progress.

Odd sequence means an update is in progress. If another thread tries to update during this time, it just increments the sequence by 2 (keeping it odd) and returns. The first updater will see the sequence changed and know it needs to do another loop to pick up the new value.

This protocol ensures that only one thread at a time is iterating through collectors, while allowing concurrent updates to be queued without additional synchronization overhead. It’s a batching optimization, multiple rapid updates can be coalesced into a single iteration through collectors.

StateFlow slot implementation

StateFlow slots are simpler than SharedFlow slots because StateFlow doesn’t need to track an index (there’s only ever one value):

1private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
2    private val _state = WorkaroundAtomicReference<Any?>(null)
3
4    override fun allocateLocked(flow: StateFlowImpl<*>): Boolean {
5        if (_state.value != null) return false// not free
6        _state.value = NONE// allocatedreturn true
7    }
8
9    fun makePending() {
10        _state.loop { state ->
11            when {
12                state == null -> return// free slot, skip
13                state === PENDING -> return// already pending
14                state === NONE -> {
15                    if (_state.compareAndSet(state, PENDING)) return
16                }
17                else -> {// must be a suspended continuationif (_state.compareAndSet(state, NONE)) {
18                        (state as CancellableContinuationImpl<Unit>).resume(Unit)
19                        return
20                    }
21                }
22            }
23        }
24    }
25}

Each slot can be in one of four states:

  • null: free, available for allocation
  • NONE: allocated but no pending value or suspension
  • PENDING: has a pending value to collect
  • CancellableContinuationImpl<Unit>: suspended, waiting for a value

The state transitions are carefully designed to be lock-free using CAS (compare-and-swap) operations. This is designed for performance, makePending() is called inside the update loop while holding the StateFlow lock, but the CAS operations don’t block.

If you look into the collection logic, it demonstrates StateFlow‘s conflation behavior:

1override suspend fun collect(collector: FlowCollector<T>): Nothing {
2    val slot = allocateSlot()
3    try {
4        if (collector is SubscribedFlowCollector) collector.onSubscription()
5        val collectorJob = currentCoroutineContext()[Job]
6        var oldState: Any? = null
7        while (true) {
8            val newState = _state.value
9            collectorJob?.ensureActive()
10// Conflate using equalityif (oldState == null || oldState != newState) {
11                collector.emit(NULL.unbox(newState))
12                oldState = newState
13            }
14            if (!slot.takePending()) {
15                slot.awaitPending()
16            }
17        }
18    } finally {
19        freeSlot(slot)
20    }
21}

The conflation check oldState != newState is where equality-based conflation happens. If the value didn’t change (according to equals()), the collector doesn’t re-emit. This is the core of StateFlow‘s “distinct until changed” behavior.

The loop structure is optimized: takePending() first tries a fast path without suspending. Only if there’s no pending value does it call awaitPending() to suspend:

1suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@{ cont ->
2    assert { _state.value !is CancellableContinuationImpl<*> }
3    if (_state.compareAndSet(NONE, cont)) return@sc
4    assert { _state.value === PENDING }
5    cont.resume(Unit)
6}

This avoids suspension if a value became available between the takePending() check and the awaitPending() call, another race condition handled gracefully.

The subscription count mechanism

Both SharedFlow and StateFlow expose a subscriptionCount property, but its implementation is subtle. You might expect it to be a simple StateFlow, but it’s not:

1private class SubscriptionCountStateFlow(initialValue: Int) : StateFlow<Int>,
2    SharedFlowImpl<Int>(1, Int.MAX_VALUE, BufferOverflow.DROP_OLDEST)
3{
4    init { tryEmit(initialValue) }
5
6    override val value: Int
7        get() = synchronized(this) { lastReplayedLocked }
8
9    fun increment(delta: Int) = synchronized(this) {
10        tryEmit(lastReplayedLocked + delta)
11    }
12}

It’s a SharedFlow masquerading as a StateFlow. Why? The comment (from the kotlinx.coroutines library) in the code explains:

1The importance of non-conflating can be demonstrated with the following example:
2val shared = flowOf(239).stateIn(this, SharingStarted.Lazily, 42)
3println(shared.first())
4yield()
5println(shared.first())
6
7If the flow is shared within the same dispatcher, the `SharingStarted.Lazily` will
8never be able to start the source: `first` sees the initial value and immediately
9unsubscribes, leaving the asynchronous `SharingStarted` with conflated zero.

If subscriptionCount was a regular StateFlow, it would conflate 0 -> 1 -> 0 into just 0, and listeners wouldn’t see the intermediate 1. By using SharedFlow with BufferOverflow.DROP_OLDEST and unlimited buffer, every subscription change is delivered, critical for SharingStarted strategies that need to observe every subscriber arrival.

Collector index management and resumption

The most complex part of SharedFlow is managing collector indices when values are emitted and consumed. Let’s examine updateCollectorIndexLocked, which is called when a collector frees its slot or advances its index:

1internal fun updateCollectorIndexLocked(oldIndex: Long): Array<Continuation<Unit>?> {
2    assert { oldIndex >= minCollectorIndex }
3    if (oldIndex > minCollectorIndex) return EMPTY_RESUMES// nothing changesval head = head
4    var newMinCollectorIndex = head + bufferSize
5    if (bufferCapacity == 0 && queueSize > 0) newMinCollectorIndex++
6
7// Find new minimum collector index
8    forEachSlotLocked { slot ->
9        if (slot.index >= 0 && slot.index < newMinCollectorIndex) {
10            newMinCollectorIndex = slot.index
11        }
12    }
13
14    assert { newMinCollectorIndex >= minCollectorIndex }
15    if (newMinCollectorIndex <= minCollectorIndex) return EMPTY_RESUMES
16
17// Calculate how many emitters can be resumedvar newBufferEndIndex = bufferEndIndex
18    val maxResumeCount = if (nCollectors > 0) {
19        val newBufferSize0 = (newBufferEndIndex - newMinCollectorIndex).toInt()
20        minOf(queueSize, bufferCapacity - newBufferSize0)
21    } else {
22        queueSize
23    }
24
25// Resume emitters and move their values to buffervar resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
26    if (maxResumeCount > 0) {
27        resumes = arrayOfNulls(maxResumeCount)
28        var resumeCount = 0
29        val buffer = buffer!!
30        for (curEmitterIndex in newBufferEndIndex until newQueueEndIndex) {
31            val emitter = buffer.getBufferAt(curEmitterIndex)
32            if (emitter !== NO_VALUE) {
33                emitter as Emitter
34                resumes[resumeCount++] = emitter.cont
35                buffer.setBufferAt(curEmitterIndex, NO_VALUE)
36                buffer.setBufferAt(newBufferEndIndex, emitter.value)
37                newBufferEndIndex++
38                if (resumeCount >= maxResumeCount) break
39            }
40        }
41    }
42
43// Update buffer state
44    updateBufferLocked(newReplayIndex, newMinCollectorIndex, newBufferEndIndex, newQueueEndIndex)
45    cleanupTailLocked()
46    if (resumes.isNotEmpty()) resumes = findSlotsToResumeLocked(resumes)
47    return resumes
48}

This function does multiple things atomically:

  1. Recomputes minCollectorIndex: Finds the slowest remaining collector
  2. Calculates buffer space freed: As the minimum advances, buffer entries before the new minimum are no longer needed
  3. Resumes queued emitters: When space is freed, suspended emitters can resume and their values move from the queue to the buffer
  4. Updates all indices: Adjusts replayIndexminCollectorIndexbufferSize, and queueSize consistently
  5. Resumes waiting collectors: If emitters added values to the buffer, collectors waiting for values can resume

All of this happens under a single lock acquisition, ensuring atomicity. The continuations are then resumed outside the lock to prevent deadlocks.

Performance characteristics and design trade-offs

The implementation makes several performance trade-offs that are worth understanding:

StateFlow updates are O(N) in the number of collectors. Every update iterates through all slots to call makePending(). For most real-world use cases with a small number of collectors (usually 1-3), this is fine and simpler than more complex data structures.

SharedFlow emissions are also O(N). Finding slots to resume requires iterating through all slots. Again, for typical use cases, this linear scan is faster than maintaining a heap or skip list of indices.

Allocation optimization over CPU optimization. The circular buffer, slot reuse, and flat combining all prioritize reducing allocations. In a garbage-collected environment like the JVM, this is often more important than raw CPU efficiency.

Lock-based synchronization. Both StateFlow and SharedFlow use synchronized blocks rather than pure lock-free algorithms. This is pragmatic, locks are fast on modern JVMs, and the uncontended case (most updates) is very cheap. The complex lock-free algorithms required for truly concurrent updates would be harder to maintain and potentially slower due to CAS retry loops.

Resume outside the lock. This is perhaps quite an important optimization. Resuming continuations can trigger arbitrary user code, so it must happen outside any locks. The two-phase pattern (collect resumes under lock, resume outside lock) appears throughout the codebase.

Conclusion

In this article, you’ve explored the internal mechanisms of Flow, StateFlow, and SharedFlow, examining the sophisticated engineering that makes them both performant and correct. The internal implementations showcase pragmatic engineering choices: simple algorithms optimized for common cases, careful attention to memory allocation, and graceful handling of concurrent access without over-engineering for rare edge cases.

Understanding these internals helps you make better decisions about when to use StateFlow vs SharedFlow, how buffer sizes affect performance, and what the performance characteristics of different configurations are. Whether you’re building reactive UI layers with StateFlow, implementing event buses with SharedFlow, or designing custom Flow operators, this knowledge provides the foundation for writing correct, performant coroutine-based code.

As always, happy coding!

— Jaewoong

You might also like

Share this post