Skip to main content
kotlin in depth advanced patterns for java engineers

Cold Flows, SharedFlow, and StateFlow

9 min read Chapter 20 of 21

Cold Flow Internals

When you write flow { emit(value) }, you’re creating a SafeFlow — an implementation of Flow that wraps your lambda and enforces context preservation. Here’s the actual internal structure, stripped to its essence:

// Simplified from kotlinx.coroutines source
public fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T> =
    SafeFlow(block)

private class SafeFlow<T>(
    private val block: suspend FlowCollector<T>.() -> Unit
) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

The block lambda captures the FlowCollector as a receiver and executes within the collector’s context. Each call to collect invokes block from scratch. This is why cold flows are cold — the lambda is stored, not executed, until someone collects.

Notice the consequence: if you put a println("Starting") at the top of your flow builder, it prints once per collector.

val flow = flow {
    println("Flow started") // Prints every time someone collects
    emit(1)
    emit(2)
}

flow.collect { } // "Flow started"
flow.collect { } // "Flow started" again

If you’re coming from RxJava, this is Observable.defer behavior — but it’s the default, not something you opt into.

Building Custom Operators

Every intermediate operator on Flow follows the same pattern: take a Flow<T>, return a Flow<R>, and inside, collect the upstream and emit transformed values downstream.

Let’s build a throttleFirst operator that emits the first value then drops all values within a time window:

fun <T> Flow<T>.throttleFirst(windowDuration: Long): Flow<T> = flow {
    var lastEmitTime = 0L
    collect { value ->
        val currentTime = System.currentTimeMillis()
        if (currentTime - lastEmitTime >= windowDuration) {
            lastEmitTime = currentTime
            emit(value)
        }
    }
}

Compare this with the RxJava equivalent — you’d need to manage a Disposable timer and coordinate thread-safe access to the timestamp. Here, structured concurrency guarantees single-threaded access to lastEmitTime within the flow body.

// Usage
clickFlow()
    .throttleFirst(300)
    .collect { handleClick(it) }

Intermediate Operators

map and filter — The Familiar Ones

These work exactly as you’d expect from Sequences, with the addition that the transformation can be a suspend function:

usersFlow
    .filter { user -> user.isActive }
    .map { user -> userRepository.fetchDetails(user.id) } // suspend call inside map
    .collect { details -> display(details) }

In Java Streams or RxJava, calling a blocking or async operation inside map requires special handling (blocking the thread, or switching to flatMap with a new Observable). In Flow, map’s lambda is already a suspend function — async operations compose naturally.

transform — The Power Tool

While map emits exactly one value per input, transform gives you full control over how many values to emit (zero, one, or many):

fun <T> Flow<T>.mapNotNull(transform: suspend (T) -> T?): Flow<T> =
    transform { value ->
        val result = transform(value)
        if (result != null) emit(result)
    }

You can emit multiple times:

flowOf("Hello World", "Kotlin Flows")
    .transform { sentence ->
        for (word in sentence.split(" ")) {
            emit(word)
        }
    }
    .collect { println(it) }
// Hello, World, Kotlin, Flows

Think of transform as the flatMap equivalent for synchronous expansion, but without the overhead of creating a new Flow per element.

FlatMap Variants: Three Strategies for Nested Flows

When each element of your flow produces another flow, you need a flatMap. Kotlin provides three distinct strategies:

flatMapConcat — Sequential. Collects each inner flow to completion before starting the next:

fun getUserPosts(userId: String): Flow<Post> = flow {
    val posts = api.fetchPosts(userId)
    posts.forEach { emit(it) }
}

userIdsFlow
    .flatMapConcat { userId -> getUserPosts(userId) }
    .collect { post -> display(post) }
// User 1's posts, then user 2's posts, then user 3's...

Use when order matters and you can’t process the next item until the previous finishes.

flatMapMerge — Concurrent. Collects inner flows simultaneously (default concurrency: 16):

userIdsFlow
    .flatMapMerge(concurrency = 4) { userId ->
        getUserPosts(userId)
    }
    .collect { post -> display(post) }
// Posts from multiple users arrive interleaved

Use for parallelizing independent requests. The concurrency parameter limits how many inner flows run at once — this is the equivalent of RxJava’s flatMap with maxConcurrency.

flatMapLatest — Cancels previous. When a new element arrives upstream, the current inner flow is cancelled:

searchQueryFlow
    .flatMapLatest { query ->
        searchApi.search(query) // Cancelled if user types a new character
    }
    .collect { results -> displayResults(results) }

This is the switchMap from RxJava. Use it for search-as-you-type, where you only care about the result of the latest query.

Error Handling

Error handling in Flow has one critical asymmetry you must understand: catch only catches upstream exceptions.

flow {
    emit(1)
    throw RuntimeException("upstream error")
    emit(2)
}
.catch { e -> println("Caught: $e") }   // ✅ Catches the upstream error
.map { it * 2 }
.collect { println(it) }

But if the exception happens downstream:

flowOf(1, 2, 3)
    .catch { e -> println("Caught: $e") }   // ❌ Won't catch
    .collect { value ->
        if (value == 2) throw RuntimeException("downstream error")
        println(value)
    }
// Crashes — catch doesn't see downstream exceptions

The fix: move the downstream logic into onEach (which is upstream from collect) and use a simple collect():

flowOf(1, 2, 3)
    .onEach { value ->
        if (value == 2) throw RuntimeException("error in processing")
        println(value)
    }
    .catch { e -> println("Caught: $e") }   // ✅ Now it catches
    .collect()

retry and retryWhen

retry re-executes the entire upstream flow when an exception occurs:

fetchDataFlow()
    .retry(3) { cause ->
        cause is IOException  // only retry on IO errors
    }
    .catch { e -> emit(fallbackData) }
    .collect { data -> process(data) }

retryWhen gives you the attempt count:

fetchDataFlow()
    .retryWhen { cause, attempt ->
        if (cause is IOException && attempt < 3) {
            delay(1000 * (attempt + 1)) // exponential backoff
            true // retry
        } else {
            false // give up
        }
    }
    .collect { data -> process(data) }

onCompletion

Acts like a finally block. Executes whether the flow completed normally or exceptionally:

dataFlow
    .onCompletion { cause ->
        if (cause != null) {
            println("Flow failed: $cause")
        } else {
            println("Flow completed successfully")
        }
    }
    .collect { process(it) }

SharedFlow: Hot Events Without State

A SharedFlow is hot — it emits regardless of whether anyone is collecting. It’s designed for events: navigation commands, toast messages, error notifications — things that happen once and aren’t “state.”

class EventBus {
    private val _events = MutableSharedFlow<Event>(
        replay = 0,               // new subscribers don't get old events
        extraBufferCapacity = 64,  // buffer before suspending emitters
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    val events: SharedFlow<Event> = _events.asSharedFlow()

    suspend fun send(event: Event) {
        _events.emit(event)
    }
}

The three configuration parameters matter:

  • replay: How many past values a new collector receives immediately. Set to 0 for fire-and-forget events. Set to 1 if late subscribers need the most recent value.
  • extraBufferCapacity: Total buffer = replay + extraBufferCapacity. When the buffer is full, emit() suspends (or applies onBufferOverflow).
  • onBufferOverflow: SUSPEND (default — back-pressure), DROP_OLDEST, or DROP_LATEST.

The RxJava equivalent is PublishSubject (with replay = 0) or ReplaySubject (with replay > 0).

StateFlow: Hot State With Guarantees

StateFlow is a specialized SharedFlow with specific behavior:

  • Always has a value — you must provide an initial value
  • Conflated — only the most recent value is kept; intermediate values may be dropped
  • distinctUntilChanged — collectors are not notified if the new value equals the current one
class CounterViewModel {
    private val _count = MutableStateFlow(0)
    val count: StateFlow<Int> = _count.asStateFlow()

    fun increment() {
        _count.update { it + 1 } // atomic read-modify-write
    }
}

Compare with RxJava’s BehaviorSubject:

// RxJava: BehaviorSubject
BehaviorSubject<Integer> count = BehaviorSubject.createDefault(0);
count.onNext(1);  // No atomic update — race condition if called from multiple threads
int current = count.getValue(); // Nullable, might be null before first emission
// Kotlin: StateFlow
val count = MutableStateFlow(0)
count.update { it + 1 }  // Atomic update using CAS loop
val current = count.value // Always non-null, always has a value

StateFlow is thread-safe and uses a CAS (Compare-And-Swap) loop internally for update. No locks, no races.

Converting Cold to Hot: shareIn and stateIn

You often have a cold flow (like a database query or network poll) and need to share it among multiple collectors without re-executing for each one.

shareIn

val locationUpdates: SharedFlow<Location> = locationProvider
    .getLocationFlow()                         // cold flow
    .shareIn(
        scope = viewModelScope,                // coroutine scope that keeps it alive
        started = SharingStarted.WhileSubscribed(5000), // stop 5s after last subscriber
        replay = 1                             // new subscribers get latest location
    )

SharingStarted.WhileSubscribed(stopTimeoutMillis) is a production-tested pattern: the upstream flow stays active while there are collectors and for stopTimeoutMillis after the last collector disappears. The 5-second timeout survives configuration changes (like screen rotation) without restarting the upstream.

stateIn

val uiState: StateFlow<UiState> = repository
    .observeData()
    .map { data -> data.toUiState() }
    .stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = UiState.Loading
    )

Decision Table: Choosing the Right Tool

CriteriaFlowSharedFlowStateFlow
Hot/ColdColdHotHot
Has current valueNoOnly if replay > 0Always
Multiple collectorsEach gets independent executionAll share same emissionsAll share same state
ConflationNoConfigurableAlways conflated
Equality checkNoNodistinctUntilChanged
Best forData streams, transformationsEvents, notificationsUI state, observable state
RxJava equivalentObservable.deferPublishSubject / ReplaySubjectBehaviorSubject

Use Flow as your default. Graduate to SharedFlow when you need to broadcast events. Graduate to StateFlow when you need observable state with a current value.