Channels, Select Expressions, and the Actor Pattern
Channels: Concurrent Queues That Suspend
A Channel is a communication primitive for passing data between coroutines. If you’ve used Java’s BlockingQueue, you already know the concept — but with a critical upgrade: channels suspend instead of block.
// Java: BlockingQueue — blocks the thread
BlockingQueue<Task> queue = new ArrayBlockingQueue<>(10);
// Producer thread — blocks if queue is full
executorService.submit(() -> {
queue.put(new Task("work")); // Thread.sleep under the hood
});
// Consumer thread — blocks if queue is empty
executorService.submit(() -> {
Task task = queue.take(); // Thread blocked, wasting resources
process(task);
});
// Kotlin: Channel — suspends the coroutine, frees the thread
val channel = Channel<Task>(capacity = 10)
// Producer coroutine — suspends if channel is full
launch {
channel.send(Task("work")) // Coroutine suspended, thread available for other work
}
// Consumer coroutine — suspends if channel is empty
launch {
val task = channel.receive() // Suspends, doesn't block
process(task)
}
The difference matters at scale. A BlockingQueue with 1000 blocked producer threads consumes 1000 thread stacks (~1GB of memory with default stack sizes). A channel with 1000 suspended coroutines uses a few megabytes.
Channel Types
Kotlin provides four channel capacity strategies. Each changes the back-pressure behavior between sender and receiver:
Rendezvous (capacity = 0)
val channel = Channel<Int>() // Default: RENDEZVOUS
Sender and receiver must “meet” — send() suspends until a receiver calls receive(), and vice versa. This is a direct handoff with zero buffering. Use when you want tight coupling between producer and consumer, ensuring the producer doesn’t get ahead.
val channel = Channel<Int>()
launch {
println("Sending 1")
channel.send(1) // Suspends here until someone receives
println("Sent 1") // Only prints after receive() below executes
channel.send(2)
channel.close()
}
launch {
delay(1000) // Producer is suspended for 1 second
println("Received: ${channel.receive()}")
println("Received: ${channel.receive()}")
}
Buffered (capacity = N)
val channel = Channel<Int>(capacity = 64)
The sender can fire up to N elements without waiting. Once the buffer fills, send() suspends until the receiver drains some elements. This is the most common choice for decoupling producer speed from consumer speed.
val channel = Channel<Int>(capacity = 3)
launch {
for (i in 1..5) {
println("Sending $i")
channel.send(i) // Won't suspend until buffer (3) is full
println("Sent $i")
}
channel.close()
}
launch {
delay(500) // Let producer fill the buffer
for (value in channel) {
println("Received: $value")
delay(200)
}
}
// Output shows sends 1-3 happen immediately, then send 4 suspends
Conflated (capacity = CONFLATED)
val channel = Channel<Int>(Channel.CONFLATED)
Buffer of 1. When a new value arrives and the buffer is occupied, the old value is dropped. The sender never suspends. Use when only the latest value matters — sensor readings, real-time pricing, cursor position.
val channel = Channel<Int>(Channel.CONFLATED)
launch {
channel.send(1)
channel.send(2)
channel.send(3) // Only this survives if receiver hasn't consumed yet
}
launch {
delay(100)
println(channel.receive()) // Prints 3 — values 1 and 2 were dropped
}
Unlimited (capacity = UNLIMITED)
val channel = Channel<Int>(Channel.UNLIMITED)
The sender never suspends. The internal buffer grows without bound. Use this with caution — a fast producer paired with a slow consumer will eat memory until you OOM. This is your escape hatch when you absolutely cannot allow back-pressure.
Channels vs Flows: The Decision Boundary
This distinction trips up developers who’ve only used Flow:
| Property | Channel | Flow |
|---|---|---|
| Temperature | Hot — exists independently of consumers | Cold — executes per collector |
| Distribution | Fan-out: each element delivered to one receiver | Broadcast: each element delivered to all collectors |
| Lifecycle | Exists until closed or scope cancelled | Exists during collection |
| Back-pressure | Via buffer capacity and suspension | Via suspension in collect |
| Use case | Communication between coroutines | Data transformation pipelines |
The rule: use Flow when you’re transforming a data stream (filter, map, combine). Use Channel when two coroutines need to hand off work or coordinate.
Fan-Out: Load Balancing Across Workers
When multiple coroutines receive from the same channel, each element goes to exactly one receiver. This gives you a natural worker pool:
suspend fun processOrders(orders: List<Order>) = coroutineScope {
val channel = Channel<Order>(capacity = 100)
// Producer: feed orders into the channel
launch {
orders.forEach { channel.send(it) }
channel.close()
}
// 5 worker coroutines consuming from the same channel
repeat(5) { workerId ->
launch {
for (order in channel) {
println("Worker $workerId processing order ${order.id}")
processOrder(order) // Each order processed by exactly one worker
}
}
}
}
Compare this with the Java equivalent, which requires an ExecutorService, a BlockingQueue, and careful shutdown coordination:
ExecutorService executor = Executors.newFixedThreadPool(5);
BlockingQueue<Order> queue = new LinkedBlockingQueue<>(100);
// Producer thread
executor.submit(() -> {
for (Order order : orders) {
try { queue.put(order); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
// How do you signal "no more work"? Poison pills? Separate flag?
});
// 5 consumer threads
for (int i = 0; i < 5; i++) {
int workerId = i;
executor.submit(() -> {
while (!Thread.interrupted()) {
try {
Order order = queue.poll(1, TimeUnit.SECONDS);
if (order != null) processOrder(order);
// But how do you know when to stop?
} catch (InterruptedException e) { break; }
}
});
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
The Kotlin version handles graceful shutdown through channel.close() — the for (order in channel) loop terminates naturally when the channel closes.
Fan-In: Merging Multiple Producers
The reverse pattern — multiple coroutines send to one channel:
suspend fun aggregateSensorData() = coroutineScope {
val aggregated = Channel<SensorReading>(capacity = 256)
// Multiple sensor coroutines, each sending to the same channel
launch { temperatureSensor.readings().collect { aggregated.send(it) } }
launch { humiditySensor.readings().collect { aggregated.send(it) } }
launch { pressureSensor.readings().collect { aggregated.send(it) } }
// Single consumer processes all readings in arrival order
launch {
for (reading in aggregated) {
database.store(reading)
}
}
}
Select Expressions: Non-Deterministic Choice
select lets you wait on multiple suspending operations simultaneously and proceed with whichever completes first. If you’ve used Go’s select statement, this is Kotlin’s equivalent.
suspend fun multiplexChannels(
primary: ReceiveChannel<Data>,
fallback: ReceiveChannel<Data>,
timeout: Long
): Data {
return select {
primary.onReceive { data ->
println("Got data from primary")
data
}
fallback.onReceive { data ->
println("Got data from fallback")
data
}
onTimeout(timeout) {
println("Both channels timed out")
Data.empty()
}
}
}
A practical use: implementing a timeout-with-fallback pattern:
suspend fun fetchWithFallback(
primaryChannel: ReceiveChannel<Response>,
cacheChannel: ReceiveChannel<Response>
): Response = select {
primaryChannel.onReceive { it }
onTimeout(3000) {
// Primary didn't respond in 3 seconds, take from cache
cacheChannel.receive()
}
}
select is biased — if multiple clauses are ready simultaneously, the first one declared wins. Keep this in mind when ordering your clauses.
The Actor Pattern: Message-Passing Concurrency
The actor model eliminates shared mutable state by confining state to a single coroutine that processes messages sequentially through a channel. No locks, no synchronization, no @Volatile.
Here’s a complete example — a bank account managed by an actor:
// Define messages as a sealed interface
sealed interface AccountMessage {
data class Deposit(val amount: Long, val response: CompletableDeferred<Long>) : AccountMessage
data class Withdraw(val amount: Long, val response: CompletableDeferred<Boolean>) : AccountMessage
data class GetBalance(val response: CompletableDeferred<Long>) : AccountMessage
}
Now build the actor using a Channel and a launch loop:
fun CoroutineScope.bankAccountActor(
initialBalance: Long = 0L
): Channel<AccountMessage> {
val channel = Channel<AccountMessage>(capacity = 64)
launch {
var balance = initialBalance // Private state — no other coroutine touches this
for (msg in channel) {
when (msg) {
is AccountMessage.Deposit -> {
balance += msg.amount
msg.response.complete(balance)
}
is AccountMessage.Withdraw -> {
if (balance >= msg.amount) {
balance -= msg.amount
msg.response.complete(true)
} else {
msg.response.complete(false)
}
}
is AccountMessage.GetBalance -> {
msg.response.complete(balance)
}
}
}
}
return channel
}
Usage from multiple coroutines — no synchronization needed:
fun main() = runBlocking {
val account = bankAccountActor(initialBalance = 1000)
// 100 concurrent deposits — no race conditions
val jobs = (1..100).map {
launch {
val response = CompletableDeferred<Long>()
account.send(AccountMessage.Deposit(10, response))
response.await()
}
}
jobs.joinAll()
val balance = CompletableDeferred<Long>()
account.send(AccountMessage.GetBalance(balance))
println("Final balance: ${balance.await()}") // Always 2000 (1000 + 100*10)
account.close()
}
This is the same concurrency model used by Erlang/OTP and Akka. The actor is a coroutine with a mailbox (channel). Messages are processed sequentially, so balance is never accessed concurrently.
Why Not Use actor { }?
The actor coroutine builder exists in kotlinx.coroutines but is marked @ObsoleteCoroutinesApi:
// ⚠️ Obsolete — do not use in production
@ObsoleteCoroutinesApi
val account = actor<AccountMessage> {
var balance = 0L
for (msg in channel) { /* ... */ }
}
The Kotlin team deprecated this because the API made it too easy to create unbounded actors and didn’t integrate well with structured concurrency patterns. The Channel + launch pattern shown above is the recommended replacement — you control the channel capacity, the coroutine scope, and the lifecycle explicitly.
When Actors Make Sense (and When They Don’t)
Actors add indirection. Every state access becomes a message send, allocation, and await. That overhead is justified when:
- Multiple coroutines modify shared state — the actor serializes access without locks
- State transitions are complex — the actor can validate invariants on every message
- You need audit/logging — every mutation flows through one place
When the overhead isn’t worth it:
// For a simple counter, use AtomicInteger
val counter = AtomicInteger(0)
repeat(100) { launch { counter.incrementAndGet() } }
// For protecting a critical section, use Mutex
val mutex = Mutex()
var sharedList = mutableListOf<String>()
launch {
mutex.withLock {
sharedList.add("item")
}
}
Use AtomicInteger/AtomicReference for simple atomic operations. Use Mutex for short critical sections where you’re guarding an existing data structure. Graduate to the actor pattern when your state management has real logic — validation, computed side effects, or complex transitions that benefit from sequential message processing.
| Approach | Best for | Overhead |
|---|---|---|
Atomic* | Simple counters, flags, references | Minimal — hardware CAS |
Mutex | Protecting existing data structures | Low — coroutine suspension |
| Actor (Channel + launch) | Complex state machines, validated transitions | Medium — message allocation, channel buffering |
StateFlow | Observable state, UI bindings | Low — CAS + collector notification |
The actor isn’t always the right tool. But when you have mutable state that multiple coroutines need to read and write, with business rules governing those mutations, the actor pattern turns a concurrency problem into a sequential programming problem. That’s a trade worth taking.