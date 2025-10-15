In this article, you'll dive deep into the internal mechanisms of Flow, StateFlow, and SharedFlow, exploring how they work under the hood.

Understanding the core abstraction: What makes Flow special The Flow interface and AbstractFlow implementation Hot vs Cold: SharedFlow enters the picture The SharedFlow architecture The internal buffer architecture The slot allocation system The emission flow in SharedFlow StateFlow: A specialized SharedFlow StateFlow slot implementation The subscription count mechanism Collector index management and resumption Performance characteristics and design trade-offs Conclusion

Kotlin Coroutines introduced Flow as a reactive stream abstraction, fundamentally changing how Android developers handle asynchronous data streams. While LiveData served the Android community for years, Flow emerged as a more flexible, and coroutine-native solution. But the power of Flow goes far beyond its clean API, it’s in the sophisticated internal machinery that makes it both performant and safe.

Understanding the core abstraction: What makes Flow special

At its heart, Flow is an asynchronous data stream that sequentially emits values. But what distinguishes Flow from other reactive streams is its adherence to two fundamental properties: context preservation and exception transparency.

Context preservation means a flow encapsulates its own execution context and never leaks it downstream. You can’t accidentally emit from the wrong dispatcher.

val myFlow = flow { emit ( 1 ) coroutineScope { emit ( 2 ) } }

Exception transparency ensures that when an emission fails, the flow immediately stops and propagates the exception. This makes flows predictable and composable, you can reason about error handling locally without worrying about upstream flows swallowing exceptions.

These properties are not just guidelines, they’re enforced by the runtime. The Flow API is carefully designed to prevent violations, and attempting to break these rules results in exception. For example, if you’re trying to collect the Flow above, you’ll face “ java.lang.IllegalStateException: Flow invariant is violated ” exception.

The Flow interface and AbstractFlow implementation

If you examine the Flow interface, it’s pretty minimal:

public interface Flow < out T > { public suspend fun collect ( collector : FlowCollector < T > ) }

A single suspending function. That’s it. This simplicity is deliberate, Flow is cold by nature, meaning each collection creates a new execution. The complexity lies in how this simple interface is implemented.

The recommended way to implement Flow is through AbstractFlow , which provides critical safety guardrails:

@ExperimentalCoroutinesApi public abstract class AbstractFlow < T > : Flow < T > , CancellableFlow < T > { public final override suspend fun collect ( collector : FlowCollector < T > ) { val safeCollector = SafeCollector ( collector , coroutineContext ) try { collectSafely ( safeCollector ) } finally { safeCollector . releaseIntercepted ( ) } } public abstract suspend fun collectSafely ( collector : FlowCollector < T > ) }

The AbstractFlow implementation wraps every collector with a SafeCollector . This wrapper is the enforcer of context preservation, it tracks the coroutine context and throws IllegalStateException if you try to emit from a different context. This is how the flow builder prevents withContext calls during emission.

When you call emit() on a SafeCollector , it performs a context check before allowing the emission like the internal codes below:

return suspendCoroutineUninterceptedOrReturn sc@ { uCont -> try { emit ( uCont , value ) } catch ( e : Throwable ) { lastEmissionContext = DownstreamExceptionContext ( e , uCont . context ) throw e } } }

The SafeCollector compares the current coroutine context with the collector’s context. If they don’t match, it knows you’ve switched contexts illegally and throws an exception. This safety is what makes Flow ‘s context preservation guarantee possible.

Hot vs Cold: SharedFlow enters the picture

While regular Flow is cold (each collector triggers independent execution), many use cases require hot flows, flows that emit values regardless of whether anyone is collecting. This is where SharedFlow comes in.

SharedFlow is fundamentally different from Flow . It’s a broadcast mechanism that shares emitted values among all active collectors. Think of it like a radio station, it broadcasts whether anyone is listening or not (though its behavior adapts based on whether there are subscribers).

The SharedFlow architecture

If you examine the SharedFlow interface, you’ll notice it extends Flow :

public interface SharedFlow < out T > : Flow < T > { public val replayCache : List < T > override suspend fun collect ( collector : FlowCollector < T > ) : Nothing }

The key difference? The collect function’s return type is Nothing . This signals that SharedFlow never completes. A call to collect() on a SharedFlow will run forever unless cancelled. This is the essence of hot flows, they’re always active.

The internal buffer architecture

The real sophistication lies in SharedFlowImpl , which manages a complex circular buffer to handle replay cache, buffering, and queued emitters. Let’s examine its structure:

internal open class SharedFlowImpl < T > ( private val replay : Int , private val bufferCapacity : Int , private val onBufferOverflow : BufferOverflow ) : AbstractSharedFlow < SharedFlowSlot > ( ) , MutableSharedFlow < T > , CancellableFlow < T > , FusibleFlow < T > { private var buffer : Array < Any ? > ? = null

This comment from the coroutines library is a goldmine of implementation detail. The buffer is a circular array that logically contains three regions:

Replay cache (from replayIndex to head + bufferSize ): Values that new collectors immediately receive Buffered values (from head to head + bufferSize ): Values available to collectors Queued emitters (from head + bufferSize to head + totalSize ): Suspended emitters waiting for slow collectors

The circular buffer optimization is important for performance. Instead of shifting elements when the head moves forward, the implementation uses modulo arithmetic to wrap around:

private fun Array < Any ? > . getBufferAt ( index : Long ) = get ( index . toInt ( ) and ( size - 1 ) ) private fun Array < Any ? > . setBufferAt ( index : Long , item : Any ? ) = set ( index . toInt ( ) and ( size - 1 ) , item )

The and (size - 1) operation is a fast modulo for power-of-two sizes. This is why the buffer is always allocated in powers of two, it turns expensive division into a single bitwise AND.

The slot allocation system

Both StateFlow and SharedFlow use a slot allocation system to track active collectors. This is managed by AbstractSharedFlow :

internal abstract class AbstractSharedFlow < S : AbstractSharedFlowSlot < * > > : SynchronizedObject ( ) { protected var slots : Array < S ? > ? = null protected var nCollectors = 0 private var nextIndex = 0

When a collector starts, it allocates a slot:

protected fun allocateSlot ( ) : S { val subscriptionCount : SubscriptionCountStateFlow ? val slot = synchronized ( this ) { val slots = when ( val curSlots = slots ) { null -> createSlotArray ( 2 ) . also { slots = it } else -> if ( nCollectors >= curSlots . size ) { curSlots . copyOf ( 2 * curSlots . size ) . also { slots = it } } else { curSlots } } var index = nextIndex var slot : S while ( true ) { slot = slots [ index ] ?: createSlot ( ) . also { slots [ index ] = it } index ++ if ( index >= slots . size ) index = 0 if ( ( slot as AbstractSharedFlowSlot < Any > ) . allocateLocked ( this ) ) break } nextIndex = index nCollectors ++ subscriptionCount = _subscriptionCount slot } subscriptionCount ? . increment ( 1 ) return slot }

This allocation strategy has several clever optimizations:

Slot reuse: Slots are never destroyed, only marked as free. When a new collector arrives, it reuses an existing slot rather than allocating a new one. This eliminates allocation churn in scenarios where collectors come and go frequently.

Amortized O(1) allocation: The array doubles in size when full, giving amortized constant-time allocation just like ArrayList .

Predictable next slot search: The nextIndex oracle remembers where the last successful allocation happened, making the next search start from a likely free slot instead of always starting from zero.

On the other hand, for SharedFlow , each slot tracks the collector’s position in the buffer:

internal class SharedFlowSlot : AbstractSharedFlowSlot < SharedFlowImpl < * > > ( ) { @JvmField var index = - 1L var cont : Continuation < Unit > ? = null if ( index >= 0 ) return false index = flow . updateNewCollectorIndexLocked ( ) return true } }

The index field is pretty important here, it tracks where this collector is in the stream of values. When a new value is emitted, the SharedFlow checks all slots and resumes those whose index is behind the buffer.

The emission flow in SharedFlow

Now, let’s explore what happens when you emit a value to a SharedFlow with tryEmit() :

override fun tryEmit ( value : T ) : Boolean { var resumes : Array < Continuation < Unit > ? > = EMPTY_RESUMES val emitted = synchronized ( this ) { if ( tryEmitLocked ( value ) ) { resumes = findSlotsToResumeLocked ( resumes ) true } else { false } } for ( cont in resumes ) cont ? . resume ( Unit ) return emitted }

The critical observation: emissions happen in two phases, and continuations are resumed outside the lock. This prevents deadlocks when using unconfined dispatchers and is a recurring pattern throughout the coroutines library.

Inside tryEmitLocked , the behavior depends on whether there are active collectors:

private fun tryEmitLocked ( value : T ) : Boolean { when ( onBufferOverflow ) { BufferOverflow . SUSPEND -> return false BufferOverflow . DROP_LATEST -> return true BufferOverflow . DROP_OLDEST -> { } } } enqueueLocked ( value ) bufferSize ++ if ( bufferSize > bufferCapacity ) dropOldestLocked ( ) if ( replaySize > replay ) { updateBufferLocked ( replayIndex + 1 , minCollectorIndex , bufferEndIndex , queueEndIndex ) } return true }

When there are no collectors, SharedFlow takes a fast path:

private fun tryEmitNoCollectorsLocked ( value : T ) : Boolean { assert { nCollectors == 0 } if ( replay == 0 ) return true enqueueLocked ( value ) bufferSize ++ if ( bufferSize > replay ) dropOldestLocked ( ) minCollectorIndex = head + bufferSize return true }

This is an important optimization: when there are no subscribers, SharedFlow only maintains the replay cache. It doesn’t allocate extra buffer space, and buffer overflow strategies don’t apply, the behavior is effectively BufferOverflow.DROP_OLDEST with buffer size equal to replay .

When tryEmit returns false (buffer is full with SUSPEND strategy), emit() must suspend:

private suspend fun emitSuspend ( value : T ) = suspendCancellableCoroutine < Unit > sc@ { cont -> var resumes : Array < Continuation < Unit > ? > = EMPTY_RESUMES val emitter = synchronized ( this ) lock@ { if ( tryEmitLocked ( value ) ) { cont . resume ( Unit ) resumes = findSlotsToResumeLocked ( resumes ) return @lock null } Emitter ( this , head + totalSize , value , cont ) . also { enqueueLocked ( it ) queueSize ++ if ( bufferCapacity == 0 ) resumes = findSlotsToResumeLocked ( resumes ) } } emitter ? . let { cont . disposeOnCancellation ( it ) } for ( r in resumes ) r ? . resume ( Unit ) }

The suspended emitter is stored directly in the buffer as an Emitter object. When space becomes available (because a slow collector catches up or is cancelled), the SharedFlow will find these Emitter objects and resume them:

for ( curEmitterIndex in newBufferEndIndex until newQueueEndIndex ) { val emitter = buffer . getBufferAt ( curEmitterIndex ) if ( emitter !== NO_VALUE ) { emitter as Emitter resumes [ resumeCount ++ ] = emitter . cont buffer . setBufferAt ( curEmitterIndex , NO_VALUE ) buffer . setBufferAt ( newBufferEndIndex , emitter . value ) newBufferEndIndex ++ if ( resumeCount >= maxResumeCount ) break } }

This is great: queued emitters aren’t stored in a separate data structure. They live in the same circular buffer as values, marked by their type. This unifies the buffer management logic.

StateFlow: A specialized SharedFlow

StateFlow is essentially a SharedFlow with specific configuration and additional constraints. Looking at the constructor:

public fun < T > MutableStateFlow ( value : T ) : MutableStateFlow < T > = StateFlowImpl ( value ?: NULL )

The StateFlowImpl extends AbstractSharedFlow and implements both MutableStateFlow and SharedFlow :

private class StateFlowImpl < T > ( initialState : Any ) : AbstractSharedFlow < StateFlowSlot > ( ) , MutableStateFlow < T > , CancellableFlow < T > , FusibleFlow < T > { private val _state = atomic ( initialState ) private var sequence = 0

The key difference from SharedFlow : StateFlow always has exactly one value, and it uses equality-based conflation. The _state is an atomic reference, and the sequence counter implements a clever synchronization protocol.

StateFlow ‘s update mechanism is more complicated than a simple lock:

private fun updateState ( expectedState : Any ? , newState : Any ) : Boolean { var curSequence : Int var curSlots : Array < StateFlowSlot ? > ? synchronized ( this ) { val oldState = _state . value if ( expectedState != null && oldState != expectedState ) return false if ( oldState == newState ) return true _state . value = newState curSequence = sequence if ( curSequence and 1 == 0 ) { curSequence ++ sequence = curSequence } else { sequence = curSequence + 2 } curSlots = slots } curSlots ? . forEach { it ? . makePending ( ) } synchronized ( this ) { if ( sequence == curSequence ) { sequence = curSequence + 1 } curSequence = sequence curSlots = slots } } }

This is flat combining, a lock-free technique where the first updater takes responsibility for applying subsequent updates. Here’s how it works:

Even sequence means the StateFlow is quiescent (no update in progress). When an update arrives, it increments the sequence to make it odd, marking an update in progress.

Odd sequence means an update is in progress. If another thread tries to update during this time, it just increments the sequence by 2 (keeping it odd) and returns. The first updater will see the sequence changed and know it needs to do another loop to pick up the new value.

This protocol ensures that only one thread at a time is iterating through collectors, while allowing concurrent updates to be queued without additional synchronization overhead. It’s a batching optimization, multiple rapid updates can be coalesced into a single iteration through collectors.

StateFlow slot implementation

StateFlow slots are simpler than SharedFlow slots because StateFlow doesn’t need to track an index (there’s only ever one value):

private class StateFlowSlot : AbstractSharedFlowSlot < StateFlowImpl < * > > ( ) { private val _state = WorkaroundAtomicReference < Any ? > ( null ) override fun allocateLocked ( flow : StateFlowImpl < * > ) : Boolean { if ( _state . value != null ) return false _state . value = NONE } fun makePending ( ) { _state . loop { state -> when { state == null -> return state === PENDING -> return state === NONE -> { if ( _state . compareAndSet ( state , PENDING ) ) return } else -> { ( state as CancellableContinuationImpl < Unit > ) . resume ( Unit ) return } } } } } }

Each slot can be in one of four states:

null : free, available for allocation

: free, available for allocation NONE : allocated but no pending value or suspension

: allocated but no pending value or suspension PENDING : has a pending value to collect

: has a pending value to collect CancellableContinuationImpl<Unit> : suspended, waiting for a value

The state transitions are carefully designed to be lock-free using CAS (compare-and-swap) operations. This is designed for performance, makePending() is called inside the update loop while holding the StateFlow lock, but the CAS operations don’t block.

If you look into the collection logic, it demonstrates StateFlow ‘s conflation behavior:

override suspend fun collect ( collector : FlowCollector < T > ) : Nothing { val slot = allocateSlot ( ) try { if ( collector is SubscribedFlowCollector ) collector . onSubscription ( ) val collectorJob = currentCoroutineContext ( ) [ Job ] var oldState : Any ? = null while ( true ) { val newState = _state . value collectorJob ? . ensureActive ( ) collector . emit ( NULL . unbox ( newState ) ) oldState = newState } if ( ! slot . takePending ( ) ) { slot . awaitPending ( ) } } } finally { freeSlot ( slot ) } }

The conflation check oldState != newState is where equality-based conflation happens. If the value didn’t change (according to equals() ), the collector doesn’t re-emit. This is the core of StateFlow ‘s “distinct until changed” behavior.

The loop structure is optimized: takePending() first tries a fast path without suspending. Only if there’s no pending value does it call awaitPending() to suspend:

suspend fun awaitPending ( ) : Unit = suspendCancellableCoroutine sc@ { cont -> assert { _state . value ! is CancellableContinuationImpl < * > } if ( _state . compareAndSet ( NONE , cont ) ) return @sc assert { _state . value === PENDING } cont . resume ( Unit ) }

This avoids suspension if a value became available between the takePending() check and the awaitPending() call, another race condition handled gracefully.

The subscription count mechanism

Both SharedFlow and StateFlow expose a subscriptionCount property, but its implementation is subtle. You might expect it to be a simple StateFlow , but it’s not:

private class SubscriptionCountStateFlow ( initialValue : Int ) : StateFlow < Int > , SharedFlowImpl < Int > ( 1 , Int . MAX_VALUE , BufferOverflow . DROP_OLDEST ) { init { tryEmit ( initialValue ) } override val value : Int get ( ) = synchronized ( this ) { lastReplayedLocked } fun increment ( delta : Int ) = synchronized ( this ) { tryEmit ( lastReplayedLocked + delta ) } }

It’s a SharedFlow masquerading as a StateFlow . Why? The comment (from the kotlinx.coroutines library) in the code explains:

The importance of non - conflating can be demonstrated with the following example : val shared = flowOf ( 239 ) . stateIn ( this , SharingStarted . Lazily , 42 ) println ( shared . first ( ) ) yield ( ) println ( shared . first ( ) ) If the flow is shared within the same dispatcher , the `SharingStarted . Lazily` will never be able to start the source : `first` sees the initial value and immediately unsubscribes , leaving the asynchronous `SharingStarted` with conflated zero .

If subscriptionCount was a regular StateFlow , it would conflate 0 -> 1 -> 0 into just 0 , and listeners wouldn’t see the intermediate 1 . By using SharedFlow with BufferOverflow.DROP_OLDEST and unlimited buffer, every subscription change is delivered, critical for SharingStarted strategies that need to observe every subscriber arrival.

Collector index management and resumption

The most complex part of SharedFlow is managing collector indices when values are emitted and consumed. Let’s examine updateCollectorIndexLocked , which is called when a collector frees its slot or advances its index:

internal fun updateCollectorIndexLocked ( oldIndex : Long ) : Array < Continuation < Unit > ? > { assert { oldIndex >= minCollectorIndex } if ( oldIndex > minCollectorIndex ) return EMPTY_RESUMES var newMinCollectorIndex = head + bufferSize if ( bufferCapacity == 0 && queueSize > 0 ) newMinCollectorIndex ++ forEachSlotLocked { slot -> if ( slot . index >= 0 && slot . index < newMinCollectorIndex ) { newMinCollectorIndex = slot . index } } assert { newMinCollectorIndex >= minCollectorIndex } if ( newMinCollectorIndex <= minCollectorIndex ) return EMPTY_RESUMES val maxResumeCount = if ( nCollectors > 0 ) { val newBufferSize0 = ( newBufferEndIndex - newMinCollectorIndex ) . toInt ( ) minOf ( queueSize , bufferCapacity - newBufferSize0 ) } else { queueSize } if ( maxResumeCount > 0 ) { resumes = arrayOfNulls ( maxResumeCount ) var resumeCount = 0 val buffer = buffer !! for ( curEmitterIndex in newBufferEndIndex until newQueueEndIndex ) { val emitter = buffer . getBufferAt ( curEmitterIndex ) if ( emitter !== NO_VALUE ) { emitter as Emitter resumes [ resumeCount ++ ] = emitter . cont buffer . setBufferAt ( curEmitterIndex , NO_VALUE ) buffer . setBufferAt ( newBufferEndIndex , emitter . value ) newBufferEndIndex ++ if ( resumeCount >= maxResumeCount ) break } } } updateBufferLocked ( newReplayIndex , newMinCollectorIndex , newBufferEndIndex , newQueueEndIndex ) cleanupTailLocked ( ) if ( resumes . isNotEmpty ( ) ) resumes = findSlotsToResumeLocked ( resumes ) return resumes }

This function does multiple things atomically:

Recomputes minCollectorIndex: Finds the slowest remaining collector Calculates buffer space freed: As the minimum advances, buffer entries before the new minimum are no longer needed Resumes queued emitters: When space is freed, suspended emitters can resume and their values move from the queue to the buffer Updates all indices: Adjusts replayIndex , minCollectorIndex , bufferSize , and queueSize consistently Resumes waiting collectors: If emitters added values to the buffer, collectors waiting for values can resume

All of this happens under a single lock acquisition, ensuring atomicity. The continuations are then resumed outside the lock to prevent deadlocks.

Performance characteristics and design trade-offs

The implementation makes several performance trade-offs that are worth understanding:

StateFlow updates are O(N) in the number of collectors. Every update iterates through all slots to call makePending() . For most real-world use cases with a small number of collectors (usually 1-3), this is fine and simpler than more complex data structures.

SharedFlow emissions are also O(N). Finding slots to resume requires iterating through all slots. Again, for typical use cases, this linear scan is faster than maintaining a heap or skip list of indices.

Allocation optimization over CPU optimization. The circular buffer, slot reuse, and flat combining all prioritize reducing allocations. In a garbage-collected environment like the JVM, this is often more important than raw CPU efficiency.

Lock-based synchronization. Both StateFlow and SharedFlow use synchronized blocks rather than pure lock-free algorithms. This is pragmatic, locks are fast on modern JVMs, and the uncontended case (most updates) is very cheap. The complex lock-free algorithms required for truly concurrent updates would be harder to maintain and potentially slower due to CAS retry loops.

Resume outside the lock. This is perhaps quite an important optimization. Resuming continuations can trigger arbitrary user code, so it must happen outside any locks. The two-phase pattern (collect resumes under lock, resume outside lock) appears throughout the codebase.

Conclusion

In this article, you’ve explored the internal mechanisms of Flow , StateFlow , and SharedFlow , examining the sophisticated engineering that makes them both performant and correct. The internal implementations showcase pragmatic engineering choices: simple algorithms optimized for common cases, careful attention to memory allocation, and graceful handling of concurrent access without over-engineering for rare edge cases.

Understanding these internals helps you make better decisions about when to use StateFlow vs SharedFlow , how buffer sizes affect performance, and what the performance characteristics of different configurations are. Whether you’re building reactive UI layers with StateFlow , implementing event buses with SharedFlow , or designing custom Flow operators, this knowledge provides the foundation for writing correct, performant coroutine-based code.

As always, happy coding!

— Jaewoong