Home · Android & Kotlin Tutorials

Kotlin Flow for Android: Getting Started

In this tutorial, you’ll learn about the basics of Kotlin Flow, and you’ll build an Android app that fetches weather forecast data using Flow.

5/5 1 Rating

Version

  • Kotlin 1.3, Android 5.0, Android Studio 3.6

Kotlin Flow is a new stream processing API developed by JetBrains, the company behind the Kotlin language. It’s an implementation of the Reactive Stream specification, an initiative whose goal is to provide a standard for asynchronous stream processing. Jetbrains built Kotlin Flow on top of Kotlin Coroutines.

By using Flow to handle streams of values, you can transform data in complex multi-threaded ways, by writing just a small bit of code!

In this tutorial, you’ll have a chance to play with several different approaches to handling collections, and you’ll explore the capabilities of Kotlin Flow by building a simple weather app. In the process, you’ll learn about:

  • Data collections and streams.
  • Synchronous and asynchronous API calls.
  • Hot and cold data streams.
  • Exception handling during flow processing.
Note: This tutorial assumes you have a solid knowledge of Android development. If you’re completely new to the topic, please check out our Beginning Android Development with Kotlin series first.

You must also have a basic understanding of Kotlin Coroutines to follow along with this tutorial. You can check out these tutorials to familiarize yourself with Coroutines:

The sample app in this tutorial uses the MVVM architectural pattern. MVVM stands for Model-View-ViewModel and represents a pattern where you update the user interface through reactive streams of data. If you’re not familiar with this pattern, please check out our MVVM on Android video course to familiarize yourself with it.

Getting Started

In the first part of the tutorial, you’ll learn about Kotlin Flow concepts, using the Kotlin Playground app as a… well, a playground! You’ll then use what you’ve learned to develop a more robust Android app that displays data retrieved via API calls.

Download the materials for this tutorial using the Download Materials button at the top or bottom of the page. For now, open the Playground-Starter project in Android Studio. This is just an empty project that will serve as a playground.

Returning Multiple Values

You probably know that suspending functions can return a single value asynchronously. When using suspending functions, you don’t have to worry about threading, the Coroutines API does that for you!

Flow, however, can return multiple values asynchronously, and over time. Asynchronous operations are operations you need to wait on, such as network requests. You never know how long these operations might take! They can take anywhere from a couple of milliseconds to several seconds to get a response. Any long-running operation should be asynchronous because actively waiting for them can freeze your programs.

You’ll see how returning values using suspending functions is very much different from the Flow API, with a few examples!

List

Open main.kt file, and add the following function:

suspend fun getValues(): List<Int> {
  delay(1000)
  return listOf(1, 2, 3)
}

This function computes values and adds those values into a List. delay() simulates a long-running operation, like you would have using a remote API.

Now add a function to process these values:

fun processValues() {
  runBlocking {
    val values = getValues()
    for (value in values) {
      println(value)
    }
  }
}

Call processValues() from main():

fun main() {
  processValues()
}

Build and run the code, using the green “play” icon next to the main function. You’ll get this output after a delay of one second:

1
2
3

When you call getValues(), it returns a List with three values. You then use those values in processValues(). Within a for loop, you iterate over the List and print out the values.

A visual representation of the function is the following:

List Diagram

This is fine for three values. But not if it takes a lot of time to compute these values. In that case, you’d have to wait for all of the values to be computed. If each value takes a second, you would wait hours for thousands of values!

It’s very inefficient as it adds extra delay to data processing. Ideally, you want to start processing each list item as soon as it becomes available. Sequences allow you to do this.

Sequence

Sequences are very similar to lists. But unlike lists, they are lazily evaluated. This means they produce values as you iterate over them, instead of producing them all at once. Refactor getValues() to return a Sequence:

suspend fun getValues(): Sequence<Int> = sequence {
  Thread.sleep(250)
  yield(1)
  Thread.sleep(250)
  yield(2)
  Thread.sleep(250)
  yield(3)
}

Thread.sleep(250) simulates a delay when computing each value.

Build and run the project. You’ll get the same output as before, but this time you won’t have to wait for all the values. You’ll produce and consume them, one at a time:

1
2
3

Now, instead of waiting for all of the items, processValues() consumes each item as getValues() produces it.

Sequences use Iterators under the hood and block while waiting for the next item. This works when returning a simple list, but what if your application needs to communicate with a streaming API?

Channel

Streaming APIs are almost the exact opposite of REST APIs. When communicating with a REST API, you make a request and the API sends a response. A streaming API works differently. It connects to a client and continuously listens to new information, over time. Twitter, for example, provides a streaming API that you can use to stream tweets in real time.

You can use Sequences for synchronous streams. But you need a different solution for asynchronous streams.

For asynchronous streams, you could use Channels from Kotlin Coroutines. Conceptually, you can think of channels as pipes. You send items through one pipe and receive a response through the other. However, a channel represents a hot stream of values. Once again, hot streams start producing values immediately.

And this introduces another set of challenges.

Hot Versus Cold Streams

A channel, which is a hot stream, will produce values even if aren’t listening to them on the other side. And if you are not listening to the stream, you are losing values.

In the following diagram, getValues() emits the items via a channel. processValues() receives 1, 2, 3, and then it stops listening for items. The channel is still producing the items, even when no one is listening:

Sequence Diagram

In practice, you can use a channel to have an open network connection. But that can lead to memory leaks. Or you could forget to subscribe to a channel, and “lose” values.

Hot streams push values even when there is no one consuming them. However, cold streams, start pushing values only when you start collecting!

And Kotlin Flow is an implementation of cold streams, powered by Kotlin Coroutines!

Kotlin Flow Basics

Flow is a stream that produces values asynchronously. Furthermore, Flow uses coroutines internally. And because of this, it enjoys all the perks of structured concurrency.

With structured concurrency, coroutines live for a limited amount of time. This time is connected to the CoroutineScope you start your coroutines in.

When you cancel the scope, you also release any running coroutines. The same rules apply to Kotlin Flow as well. When you cancel the scope, you also dispose of the Flow. You don’t have to free up memory manually! :]

There are some similarities between Kotlin Flow, LiveData and RxJava. All of them provide a way to implement the observer pattern in your code.

  • LiveData is a simple observable data holder. It’s best used to store UI state, such as lists of items. It’s easy to learn and work with. But it doesn’t provide much more than that .
  • RxJava is a very powerful tool for reactive streams. It has many features and a plethora of transformation operators. But it has a steep learning curve!
  • Flow falls somewhere in between LiveData and RxJava. It’s very powerful but also very easy to use! The Flow API even looks a lot like RxJava!

Both Kotlin Flow and RxJava are implementations of the Reactive Stream specification.

However, Flow uses coroutines internally and doesn’t have some of the features RxJava has. Partly because it doesn’t need some features, and partly because some features are still being developed!

Note: In Kotlin 1.3.0 release, core Flow APIs and basic operators are stable. APIs that are not stable have annotations @ExperimentalCoroutinesApi or @FlowPreview.

Now that you’ve had enough theory, it’s time to create your first Flow!

Flow Builders

Navigate to main.kt in the starter project.

You’ll start by creating a simple Flow. To create a Flow, you need to use a flow builder. You’ll start by using the most basic builder – flow { ... }. Add the following code above main():

val namesFlow = flow {
  val names = listOf("Jody", "Steve", "Lance", "Joe")
  for (name in names) {
    delay(100)
    emit(name)
  }
}

Make sure to add the imports from the kotlinx.coroutines package.

Here, you’re using flow() to create a Flow from a suspendable lambda block. Inside the block, you declare names and assigning it to a list of names.

Next, you used a for loop to go through the list of names and emit each name after a small delay. The Flow uses emit() send values to consumers.

There are other Flow builders that you can use for an easy Flow declaration. For example, you can use flowOf() to create a Flow from a fixed set of values:

val namesFlow = flowOf("Jody", "Steve", "Lance", "Joe")

Or you can convert various collections and sequences to a Flow:

val namesFlow = listOf("Jody", "Steve", "Lance", "Joe").asFlow()

Flow Operators

Moreover, you can use operators to transform Flows, as you would do with collections or sequences. There are two types of operators available inside the Flow – intermediate and terminal.

Intermediate Operators

Go back to main.kt and add the following code to main():

fun main() = runBlocking {
  namesFlow
      .map { name -> name.length }
      .filter { length -> length < 5 }
    
  println()
}

Here, you used the Flow of names from earlier and you applied two intermediate operators to it:

  • map transforms each value to another value. Here you transformed name values to their length.
  • filter selects values that meet a condition. Here you chose values that are less than five.

The important thing to notice here is the block of code inside each of these operators. These blocks of code can call suspending functions! So you can also delay within these blocks. Or you can call other suspending functions!

What happens with the Flow is visible on the image below:

Flow Filter Diagram

Flow will emit values one at a time. You then apply each operator to each of the values, once again, one at a time. And finally, when you start consuming values, you'll receive them in the same order.

Build and run by clicking the play button next to the main function.

You'll notice that nothing happens! This is because intermediate operators are cold. When you invoke an intermediate operation on a Flow, the operation is not executed immediately. Instead, you return the transformed Flow, which is still cold. The operations execute only when you invoke a terminal operator on the final stream.

Terminal Operators

Because Flows are cold, they won't produce values until a terminal operator is called. Terminal operators are suspending functions that start the collection of the flow. When you invoke a terminal operator, you invoke all the intermediate operators along with it:

An example of what would happen with original values, if you were to listen to a Flow:

Terminal Operation Diagram

As you start collecting values, you get one at a time, and you don't block while waiting for new values!

Now go back to the main.kt file and add the collect() terminal operator:

fun main() = runBlocking {
  namesFlow
      .map { name -> name.length }
      .filter { length -> length < 5 }
      .collect { println(it) }

  println()
}

Since collect() is a suspending function, it can only be called from a coroutine or another suspending function. This is why you wrap the code with runBlocking().

Build and run the code by clicking the play button. You'll get the following output:

4
3

collect() is the most basic terminal operator. It collects values from a Flow and executes an action with each item. In this case, you're printing an item to the console. There are other terminal operators available; you'll learn about them later in this tutorial.

You can check out the final code in the Playground-Final project.

Now that you know the basics of Flow, let's move to our weather app, where you'll see Kotlin Flow doing real work! :]

Flow on Android

Now you'll apply everything you've learned so far in an Android app! The Sunzoid app is a simple weather app that displays a forecast for a specific city. It fetches the weather data from the network and stores it into a database to support offline mode.

Open the Sunzoid-Starter project in Android Studio. Build and run the app, and you'll see an empty screen:

Sunzoid Initial Screen

There's a search icon in the top left corner. You can tap it to enter a specific location. If you do that now, nothing will happen. But hang on — you're going to implement this functionality next!

There's a fair amount of code in the starter project:

Sunzoid Project Structure

You'll focus on the use of Kotlin Flow in the app. But if you want, you can explore the code, and get familiar with the app!

The starter project follows Google's recommended guide to app architecture. You can find the guide on the Android developer site documentation:

Google Recommended Architecture

Copyright 2020 Google LLC

At the top of the scheme, there's a UI layer that talks to the ViewModel architecture component. ViewModel communicates with a data repository. The repository fetches the data from the network using Retrofit. It stores the data in a local Room database. Finally, it exposes the database data to the ViewModel.

Room and Retrofit, in their latest versions, support Kotlin Coroutines. The starter project is set up to use them with coroutines.

You'll use Kotlin Flow to pass the data from the database to the ViewModel. The ViewModel will then collect the data. You'll also use coroutines and Flow to implement the search functionality.

Fetch Data

You'll start by implementing the logic to fetch the forecast data. Open HomeActivity.kt. In onCreate(), add a call to fetchLocationDetails(), right below initUi():

homeViewModel.fetchLocationDetails(851128)

fetchLocationDetails() accepts a cityId as an argument. For now, you'll pass the hardcoded ID. You'll add a search feature later that will allow you to search for a specific location.

Build and run the project. You still won't see anything on the screen:

Sunzoid Empty Screen

But this time the app has fetched the forecast data and saved it to the Room database! :]

Room and Flow

In Room 2.1, the library added coroutine support for one-off operations. Room 2.2 added Flow support for observable queries. This enables you to get notified any time you add or remove entries in the database.

In the current implementation, only the user can trigger data fetching. But you can easily implement logic that schedules and updates the database every three hours, for example. By doing this, you make sure your UI is up to date with the latest data. You'll use Kotlin Flow to get notified of every change in the table.

Plugging Into the Database
Open ForecastDao.kt and add a call to getForecasts(). This method returns Flow<List<DbForecast>>:

@Query("SELECT * FROM forecasts_table")
fun getForecasts(): Flow<List<DbForecast>>

getForecasts() returns forecast data for a specific city from forecasts_table. Whenever data in this table changes, the query executes again and Flow emits fresh data.

Next, open WeatherRepository.kt and add a function called getForecasts:

fun getForecasts(): Flow<List<Forecast>>

Next, add the implementation to WeatherRepositoryImpl.kt:

override fun getForecasts() =
    forecastDao
      .getForecasts()
      .map { dbMapper.mapDbForecastsToDomain(it) }

This method uses the forecastDao to get data from the database. The database returns the database model. It's a good practice for every layer in the app to work with its own model. Using map(), you convert the database model to the Forecast domain model.

Open HomeViewModel.kt and add forecasts, like so:

//1
val forecasts: LiveData<List<ForecastViewState>> = weatherRepository
    //2
    .getForecasts()
    //3
    .map {
      homeViewStateMapper.mapForecastsToViewState(it)
    }
    //4
    .asLiveData()

There are a few things going on here:

  1. First, you declare forecasts of the LiveData<List<ForecastViewState>> type. The Activity will observe changes in forecasts. forecasts could have been of the Flow<List<ForecastViewState>> type, but LiveData is preferred when implementing communication between View and ViewModel. This is because LiveData has internal lifecycle handling!
  2. Next, reference weatherRepository to get the Flow of forecast data.
  3. Then call map(). map() converts the domain models to the ForecastViewState model, which is ready for rendering.
  4. Finally, convert a Flow to LiveData, using asLiveData(). This function is from the AndroidX KTX library for Lifecycle and LiveData.

Context Preservation and Backpressure

The collection of a Flow always happens in the context of the parent coroutine. This property of Flow is called context preservation. But you can still change the context when emitting items. To change the context of emissions you can use flowOn().

You could have a scenario in which the Flow produces events faster than the collector can consume them. In reactive streams, this is called backpressure. Kotlin Flow supports backpressure out of the box since it's based on coroutines. When the consumer is in a suspended state or is busy doing some work, the producer will recognize that. It will not produce any items during this time.

Observing Values

Finally, open HomeActivity.kt and observe forecasts from initObservers():

homeViewModel.forecasts.observe(this, Observer {
  forecastAdapter.setData(it)
})

Whenever forecasts change in the database, you'll receive new data in the Observer, and display it on the UI.

Build and run the app. Now the home screen displays forecast data! :]

Sunzoid With Forecast Data

Congratulations! You've implemented communication between multiple layers of your app using Flow and LiveData!

Cancellation

In HomeViewModel.kt, you're observing the forecasts. You've noticed that you never stop observing. How long is this observed, then?

In this case, the Flow collection starts when LiveData becomes active. Then, if LiveData becomes inactive before the Flow completes, the flow collection is canceled.

The cancellation occurs after a timed delay unless LiveData becomes active again before that timeout. The default delay triggering cancellation is 5000 milliseconds. You can customize the timeout value if necessary. The timeout exists to handle cases like Android configuration changes.

If LiveData becomes active again after cancellation, the Flow collection restarts.

Exceptions

Flow streams can complete with an exception if an emitter or code inside the operators throws an exception. catch() blocks handle exceptions within Flows. You can do this imperatively or declaratively. A try-catch block on the collector's side is an example of an imperative approach.

It's imperative because these catch any exceptions that occur in the emitter or in any of the operators.

You can use catch() to handle errors declaratively instead. Declarative here means you declare the function to handle errors. And you declare it within the Flow itself, and not a try-catch block.

Open HomeViewModel.kt and navigate to forecasts. Add catch() right before map(). To simulate errors in the stream, throw an exception from map():

val forecasts: LiveData<List<ForecastViewState>> = weatherRepository
    .getForecasts()
    .catch {
      // Log Error
    }
    .map {
      homeViewStateMapper.mapForecastsToViewState(it)
      throw Exception()
    }
    .asLiveData()

Build and run the app. You'll notice that the app crashes! catch() catches only upstream exceptions. That is, it catches exceptions from all the operators above the catch. catch() doesn't catch any exception that occurs after the operator.

Now move catch() below map():

val forecasts: LiveData<List<ForecastViewState>> = weatherRepository
    .getForecasts()
    .map {
      homeViewStateMapper.mapForecastsToViewState(it)
      throw Exception()
    }
    .catch {
      // Log Error
    }
    .asLiveData()

Build and run the app again. Now you'll see an empty screen:

Sunzoid Thrown Exception

This is an example of exception transparency, where you're able to separate the handling of exceptions that occur in the Flow from the collection of values. You're also being transparent about exceptions, as you don't hide any errors, you explicitly handle them in an operator!

Before proceeding, remove the line that throws an exception from map().

Searching Locations

So far, your app displayed a forecast for a hardcoded location. Now you'll implement the search functionality! This will allow users to search for a specific location using coroutines and Flow. As the user types in the search box, the app will perform a search for every letter typed and will update the search result.

In HomeActivity.kt, you already have a listener attached to the search view. When the user changes query text, the app sends the new value to queryChannel in HomeViewModel.kt. HomeViewModel.kt uses a BroadcastChannel as a bridge to pass the text from the view to the ViewModel. offer() passes the text and synchronously adds the specified element to the channel.

Querying Locations

Now add the logic for consuming the events from the channel as a Flow:

private val _locations = queryChannel
    //1
    .asFlow()
    //2
    .debounce(SEARCH_DELAY_MILLIS)
    //3
    .mapLatest {
      if (it.length >= MIN_QUERY_LENGTH) {
        getLocations(it)
      } else {
        emptyList()
      }
    }
    //4
    .catch {
      // Log Error
    }

Here's what happens in this block of code:

  1. First, the call to asFlow converts the Channel into a Flow.
  2. Next, debounce() waits for values to stop arriving for a given time period. This is used to avoid processing every single letter typed by users. Users usually type several letters in a row. You don't need to make a network request until the user stops typing. This ensures that you're performing the API call only after 500 milliseconds have passed with no typing!
  3. Then, mapLatest() performs the API call and returns location results. If the original flow emits a new value while the previous API call is still in progress, mapLatest() ensures that computation of the previous block is canceled. mapLatest() performs the API call only if the search query contains at least two characters.
  4. Finally, catch() handles errors.

Add locations to HomeViewModel.kt. This allows you to observe from the activity:

val locations = _locations.asLiveData()

Here you're using asLiveData() to collect values from the origin flow and add transform them to a LiveData instance.

Open HomeActivity.kt and delete the call from onCreate() to homeViewModel.fetchLocationDetails(). Instead observe locations from initObservers():

private fun initObservers() {
  homeViewModel.locations.observe(this, Observer {
    locationAdapter.setData(it)
  })

  ...
}

Once again, build and run the app. Now enter a search query. You'll see the options generated from your query:

Sunzoid With Location Query

Go ahead and tap any of the options. The home screen will display forecast data for a newly selected location.

Why Kotlin Flow

There are already other implementations of the reactive stream specification, such as RxJava. So why use Kotlin Flow? One reason is that you can't use JVM-specific libraries like RxJava in Kotlin multi-platform projects. But Flow is part of the Kotlin language, so it's ideal to use in Kotlin multi-platform projects.

Also, Kotlin Flow has fewer operators, but they are much simpler. A single operator can handle both synchronous and asynchronous logic since the block of code that operators accept can be suspended!

And Kotlin Flow is interoperable with other reactive streams and coroutines. Since it's built on top of coroutines, it provides all the perks of structured concurrency and cancellation. In combination with suspending functions, it produces a simple, readable, and understandable API. It also supports backpressure out of the box.

Where To Go From Here?

You can download the final project using the Download Materials button at the top or bottom of this tutorial.

You've created an app that uses Kotlin Flow for communication between application layers. In the process, you learned what is so special about Kotlin Flow and how it's different from existing solutions. You also used Flow builders to create a Flow. You became familiar with basic operators to transform a stream of data. And finally, you applied terminal operators to consume the values!

To gain more in-depth knowledge about Kotlin Flow, check out our Kotlin Flow: Getting Started video course. You'll learn a lot more about Flow builders, operators, and cancellation. You'll also learn how to work with Flows, change its context, add buffers, combine multiple Flows, and handle exceptions!

We hope you've enjoyed this tutorial! If you have any questions or comments to share, please join the forum discussion below. :]

Average Rating

5/5

Add a rating for this content

1 rating

More like this

Contributors

Comments