4 min read
Coroutine Flows

Blog Series: Mastering Kotlin Coroutines

Coroutine Flows

In this article, we will dive deep into Kotlin Flows, an essential tool for reactive programming with coroutines. Flows provide a clean and efficient way to handle asynchronous streams of data, making them a powerful alternative to traditional callbacks or libraries like RxJava.


What Are Flows?

A Flow represents a stream of asynchronously computed values that can emit multiple values over time. Unlike suspending functions, which return a single value, Flows are designed to emit a sequence of values.

Key Characteristics of Flows:

  1. Cold Streams:
    • Flows are lazy; they don’t start emitting values until collected.
  2. Structured Concurrency:
    • Flows integrate seamlessly with Kotlin’s structured concurrency.
  3. Reactive Streams:
    • Flows support operations like transformations (map, filter) and combinations (zip, merge).

Simple Flow Example

Let’s start with a simple example of a Flow that emits numbers from 1 to 5 with a delay:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(1000) // Simulating work
        emit(i)     // Emitting values
    }
}

fun main() = runBlocking {
    println("Collecting values...")
    simpleFlow().collect { value ->
        println("Received: $value")
    }
}

How It Works:

  • flow: Creates a Flow that emits values.
  • emit: Sends a value to the collector.
  • collect: Starts collecting values from the Flow.

Intermediate Operators in Flows

Operators allow you to transform and filter data emitted by a Flow. Here are some commonly used operators:

1. map

Transforms each emitted value.

fun main() = runBlocking {
    simpleFlow().map { it * 2 }.collect { value ->
        println("Mapped Value: $value")
    }
}

2. filter

Filters emitted values based on a condition.

fun main() = runBlocking {
    simpleFlow().filter { it % 2 == 0 }.collect { value ->
        println("Filtered Value: $value")
    }
}

3. take

Limits the number of emissions.

fun main() = runBlocking {
    simpleFlow().take(3).collect { value ->
        println("Taken Value: $value")
    }
}

Combining Flows

You can combine multiple Flows to create complex data streams using operators like zip and combine.

Example: Zipping Flows

fun flowA(): Flow<Int> = flow {
    emit(1)
    delay(500)
    emit(2)
}

fun flowB(): Flow<String> = flow {
    emit("A")
    delay(1000)
    emit("B")
}

fun main() = runBlocking {
    flowA().zip(flowB()) { a, b -> "$a$b" }.collect { value ->
        println("Zipped Value: $value")
    }
}

Example: Combining Flows

fun main() = runBlocking {
    val flow1 = (1..3).asFlow()
    val flow2 = flowOf("One", "Two", "Three")

    flow1.combine(flow2) { num, str -> "$num - $str" }
        .collect { println(it) }
}

Hot vs Cold Flows

  1. Cold Flows:

    • Start emitting values only when collected.
    • Each collector receives the same set of emissions.
  2. Hot Flows:

    • Emit values regardless of collectors.
    • Useful for events like user interactions or live data streams.

Example: Hot Flow with SharedFlow

val sharedFlow = MutableSharedFlow<Int>()

fun main() = runBlocking {
    launch {
        sharedFlow.emit(1)
        delay(500)
        sharedFlow.emit(2)
    }

    sharedFlow.collect { value ->
        println("Collector 1: $value")
    }

    sharedFlow.collect { value ->
        println("Collector 2: $value")
    }
}

Exception Handling in Flows

Exceptions in Flows can be handled using operators like catch or by wrapping the Flow in a try-catch block.

Example: Using catch

fun faultyFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        if (i == 3) throw RuntimeException("Error on $i")
        emit(i)
    }
}

fun main() = runBlocking {
    faultyFlow()
        .catch { e -> println("Caught exception: ${e.message}") }
        .collect { value -> println("Received: $value") }
}

Flow Best Practices

  1. Use Lifecycle-Aware Collectors:
    • Use lifecycleScope or viewModelScope in Android to avoid memory leaks.
  2. Limit Side Effects:
    • Keep transformations pure; avoid introducing side effects within operators.
  3. Error Handling:
    • Always handle exceptions gracefully using catch or onCompletion.

Conclusion

Kotlin Flows are a powerful tool for managing asynchronous streams of data. By leveraging operators and combining flows, you can build robust and reactive applications. In the next article, we’ll dive into coroutine exception handling, exploring how to manage and recover from errors effectively.

Stay tuned!