Cold Flows, SharedFlow, and StateFlow
Cold Flow Internals
When you write flow { emit(value) }, you’re creating a SafeFlow — an implementation of Flow that wraps your lambda and enforces context preservation. Here’s the actual internal structure, stripped to its essence:
// Simplified from kotlinx.coroutines source
public fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T> =
SafeFlow(block)
private class SafeFlow<T>(
private val block: suspend FlowCollector<T>.() -> Unit
) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
The block lambda captures the FlowCollector as a receiver and executes within the collector’s context. Each call to collect invokes block from scratch. This is why cold flows are cold — the lambda is stored, not executed, until someone collects.
Notice the consequence: if you put a println("Starting") at the top of your flow builder, it prints once per collector.
val flow = flow {
println("Flow started") // Prints every time someone collects
emit(1)
emit(2)
}
flow.collect { } // "Flow started"
flow.collect { } // "Flow started" again
If you’re coming from RxJava, this is Observable.defer behavior — but it’s the default, not something you opt into.
Building Custom Operators
Every intermediate operator on Flow follows the same pattern: take a Flow<T>, return a Flow<R>, and inside, collect the upstream and emit transformed values downstream.
Let’s build a throttleFirst operator that emits the first value then drops all values within a time window:
fun <T> Flow<T>.throttleFirst(windowDuration: Long): Flow<T> = flow {
var lastEmitTime = 0L
collect { value ->
val currentTime = System.currentTimeMillis()
if (currentTime - lastEmitTime >= windowDuration) {
lastEmitTime = currentTime
emit(value)
}
}
}
Compare this with the RxJava equivalent — you’d need to manage a Disposable timer and coordinate thread-safe access to the timestamp. Here, structured concurrency guarantees single-threaded access to lastEmitTime within the flow body.
// Usage
clickFlow()
.throttleFirst(300)
.collect { handleClick(it) }
Intermediate Operators
map and filter — The Familiar Ones
These work exactly as you’d expect from Sequences, with the addition that the transformation can be a suspend function:
usersFlow
.filter { user -> user.isActive }
.map { user -> userRepository.fetchDetails(user.id) } // suspend call inside map
.collect { details -> display(details) }
In Java Streams or RxJava, calling a blocking or async operation inside map requires special handling (blocking the thread, or switching to flatMap with a new Observable). In Flow, map’s lambda is already a suspend function — async operations compose naturally.
transform — The Power Tool
While map emits exactly one value per input, transform gives you full control over how many values to emit (zero, one, or many):
fun <T> Flow<T>.mapNotNull(transform: suspend (T) -> T?): Flow<T> =
transform { value ->
val result = transform(value)
if (result != null) emit(result)
}
You can emit multiple times:
flowOf("Hello World", "Kotlin Flows")
.transform { sentence ->
for (word in sentence.split(" ")) {
emit(word)
}
}
.collect { println(it) }
// Hello, World, Kotlin, Flows
Think of transform as the flatMap equivalent for synchronous expansion, but without the overhead of creating a new Flow per element.
FlatMap Variants: Three Strategies for Nested Flows
When each element of your flow produces another flow, you need a flatMap. Kotlin provides three distinct strategies:
flatMapConcat — Sequential. Collects each inner flow to completion before starting the next:
fun getUserPosts(userId: String): Flow<Post> = flow {
val posts = api.fetchPosts(userId)
posts.forEach { emit(it) }
}
userIdsFlow
.flatMapConcat { userId -> getUserPosts(userId) }
.collect { post -> display(post) }
// User 1's posts, then user 2's posts, then user 3's...
Use when order matters and you can’t process the next item until the previous finishes.
flatMapMerge — Concurrent. Collects inner flows simultaneously (default concurrency: 16):
userIdsFlow
.flatMapMerge(concurrency = 4) { userId ->
getUserPosts(userId)
}
.collect { post -> display(post) }
// Posts from multiple users arrive interleaved
Use for parallelizing independent requests. The concurrency parameter limits how many inner flows run at once — this is the equivalent of RxJava’s flatMap with maxConcurrency.
flatMapLatest — Cancels previous. When a new element arrives upstream, the current inner flow is cancelled:
searchQueryFlow
.flatMapLatest { query ->
searchApi.search(query) // Cancelled if user types a new character
}
.collect { results -> displayResults(results) }
This is the switchMap from RxJava. Use it for search-as-you-type, where you only care about the result of the latest query.
Error Handling
Error handling in Flow has one critical asymmetry you must understand: catch only catches upstream exceptions.
flow {
emit(1)
throw RuntimeException("upstream error")
emit(2)
}
.catch { e -> println("Caught: $e") } // ✅ Catches the upstream error
.map { it * 2 }
.collect { println(it) }
But if the exception happens downstream:
flowOf(1, 2, 3)
.catch { e -> println("Caught: $e") } // ❌ Won't catch
.collect { value ->
if (value == 2) throw RuntimeException("downstream error")
println(value)
}
// Crashes — catch doesn't see downstream exceptions
The fix: move the downstream logic into onEach (which is upstream from collect) and use a simple collect():
flowOf(1, 2, 3)
.onEach { value ->
if (value == 2) throw RuntimeException("error in processing")
println(value)
}
.catch { e -> println("Caught: $e") } // ✅ Now it catches
.collect()
retry and retryWhen
retry re-executes the entire upstream flow when an exception occurs:
fetchDataFlow()
.retry(3) { cause ->
cause is IOException // only retry on IO errors
}
.catch { e -> emit(fallbackData) }
.collect { data -> process(data) }
retryWhen gives you the attempt count:
fetchDataFlow()
.retryWhen { cause, attempt ->
if (cause is IOException && attempt < 3) {
delay(1000 * (attempt + 1)) // exponential backoff
true // retry
} else {
false // give up
}
}
.collect { data -> process(data) }
onCompletion
Acts like a finally block. Executes whether the flow completed normally or exceptionally:
dataFlow
.onCompletion { cause ->
if (cause != null) {
println("Flow failed: $cause")
} else {
println("Flow completed successfully")
}
}
.collect { process(it) }
SharedFlow: Hot Events Without State
A SharedFlow is hot — it emits regardless of whether anyone is collecting. It’s designed for events: navigation commands, toast messages, error notifications — things that happen once and aren’t “state.”
class EventBus {
private val _events = MutableSharedFlow<Event>(
replay = 0, // new subscribers don't get old events
extraBufferCapacity = 64, // buffer before suspending emitters
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val events: SharedFlow<Event> = _events.asSharedFlow()
suspend fun send(event: Event) {
_events.emit(event)
}
}
The three configuration parameters matter:
replay: How many past values a new collector receives immediately. Set to 0 for fire-and-forget events. Set to 1 if late subscribers need the most recent value.extraBufferCapacity: Total buffer = replay + extraBufferCapacity. When the buffer is full,emit()suspends (or appliesonBufferOverflow).onBufferOverflow:SUSPEND(default — back-pressure),DROP_OLDEST, orDROP_LATEST.
The RxJava equivalent is PublishSubject (with replay = 0) or ReplaySubject (with replay > 0).
StateFlow: Hot State With Guarantees
StateFlow is a specialized SharedFlow with specific behavior:
- Always has a value — you must provide an initial value
- Conflated — only the most recent value is kept; intermediate values may be dropped
distinctUntilChanged— collectors are not notified if the new value equals the current one
class CounterViewModel {
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count.asStateFlow()
fun increment() {
_count.update { it + 1 } // atomic read-modify-write
}
}
Compare with RxJava’s BehaviorSubject:
// RxJava: BehaviorSubject
BehaviorSubject<Integer> count = BehaviorSubject.createDefault(0);
count.onNext(1); // No atomic update — race condition if called from multiple threads
int current = count.getValue(); // Nullable, might be null before first emission
// Kotlin: StateFlow
val count = MutableStateFlow(0)
count.update { it + 1 } // Atomic update using CAS loop
val current = count.value // Always non-null, always has a value
StateFlow is thread-safe and uses a CAS (Compare-And-Swap) loop internally for update. No locks, no races.
Converting Cold to Hot: shareIn and stateIn
You often have a cold flow (like a database query or network poll) and need to share it among multiple collectors without re-executing for each one.
shareIn
val locationUpdates: SharedFlow<Location> = locationProvider
.getLocationFlow() // cold flow
.shareIn(
scope = viewModelScope, // coroutine scope that keeps it alive
started = SharingStarted.WhileSubscribed(5000), // stop 5s after last subscriber
replay = 1 // new subscribers get latest location
)
SharingStarted.WhileSubscribed(stopTimeoutMillis) is a production-tested pattern: the upstream flow stays active while there are collectors and for stopTimeoutMillis after the last collector disappears. The 5-second timeout survives configuration changes (like screen rotation) without restarting the upstream.
stateIn
val uiState: StateFlow<UiState> = repository
.observeData()
.map { data -> data.toUiState() }
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = UiState.Loading
)
Decision Table: Choosing the Right Tool
| Criteria | Flow | SharedFlow | StateFlow |
|---|---|---|---|
| Hot/Cold | Cold | Hot | Hot |
| Has current value | No | Only if replay > 0 | Always |
| Multiple collectors | Each gets independent execution | All share same emissions | All share same state |
| Conflation | No | Configurable | Always conflated |
| Equality check | No | No | distinctUntilChanged |
| Best for | Data streams, transformations | Events, notifications | UI state, observable state |
| RxJava equivalent | Observable.defer | PublishSubject / ReplaySubject | BehaviorSubject |
Use Flow as your default. Graduate to SharedFlow when you need to broadcast events. Graduate to StateFlow when you need observable state with a current value.