Home Android & Kotlin Books Kotlin Coroutines by Tutorials

14
Beginning with Coroutine Flow Written by Filip Babić

Heads up... You're reading this book for free, with parts of this chapter shown beyond this point as scrambled text.

You can unlock the rest of this book, and our entire catalogue of books and videos, with a raywenderlich.com Professional subscription.

Coroutines are amazing when it comes to bridging the synchronous and asynchronous worlds, to return values and communicate between threads. Most of the time that’s what you want and need, but sometimes, computer systems require you to consume multiple values over a period of time.

And there are two different ways you can do this - using sequences and streams. However there are certain limitations to both approaches. You’ve already learned about sequences, but they force you to block the calling thread when you’re observing values. So let’s see what streams have to offer, and how they behave in code.

Streams of data

One of the key similarities between sequences and streams is that both constructs can generate an infinite amount of elements. Sequences usually do this by defining an operation which you run behind the scenes to build a value.

This is also the key difference between streams and sequences, as you usually build streams using a function or their constructor. You then have an interface between the provider of values and a consumer, exposing a different part of the interface to each side.

Take this snippet for example, which uses the Reactive Extensions, or Rx, version of observable streams of data:

val subject = BehaviorSubject.create<String>()

subject.subscribe(observer)
subject.onNext("one")
subject.onNext("two")
subject.onNext("three")

You create a Subject, which implements both sides of the stream interface. The provider can use functions such as offer(), onNext(), and send(), to fill the queue for the stream with values to consume. In this case it’s using onNext() from Rx.

Every Observer which subscribes to this stream will receive all its events, from the moment they subscribed, until they unsubscribe, or the stream closes. The observer in Rx will look like this:

val observer = object: Observer<String> {
  
  override fun onNext(value: String) {
    // consume the value
  }
  
  override fun onError(throwable: Throwable) {
    // handle the error
  }
  
  override fun onComplete() {
    // the stream completed
  }
}

Every time you send any of the events to the Observable side of the Subject, it will send all those events to all of its Observers. It acts as a relay of data from one central point to multiple observing nodes. This is the general idea of streams. Being observable and sending the events to every single Observer which is listening to its data.

But, depending on the implementation of streams, you might have a different setup. One of the things each stream mechanism and implementation shares is the type of streams and when their values are propagated. As such, there are hot and cold streams of data. Let’s consume them one at a time.

Hot streams

Hot streams behave just like TV channels, or radio stations. They keep sending events, and emitting their data, even though no one may be listening or watching the show. It’s why they are called hot. As they don’t care if there are any observers, they will keep on working and computing no matter what, from the moment you create them, until they close.

Cold streams

It makes sense that, if hot streams are computed right away, and work even without any observers, cold streams do the opposite. They are like the builder pattern, where you define a set of behaviors for a construct, upfront, and only when you call a finalizing function, does it become live and active.

Limitations of streams

In every-day programming, there are certain limitations to the way things should operate for optimal use. You don’t want to waste resources, freeze the UI, lose data and so on. Some of these concepts apply to streams, as well. These limitations revolve around the same problem - the speed of producing and consuming the values.

Supporting backpressure

As you’ve learned, if one side of the producer-consumer pair is too fast or too slow in its job, you will lose data, or end up blocking the non-bottlenecked side. Unless you add backpressure support.

A new approach to streams

Having the best of both worlds, the Flow API supports cold, asynchronously-built streams of values, where the thread communication and backpressure support is implemented through the use of coroutines. When you think about it, it’s the perfect combination.

Building Flows

To create a Flow, just like with standard coroutines, you have to use a builder. But first open up Main.kt, in the starter project, which you can find by navigating to the project files, the starter folder, and opening the beginning_with_coroutines_flow folder.

val flowOfStrings = flow {
  for (number in 0..100) {
    emit("Emitting: $number")
  }
}
GlobalScope.launch {
  flowOfStrings.collect { value ->
    println(value)
  }
}

Thread.sleep(1000)
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> 
  = SafeFlow(block)

Collecting and transforming values

Once you build a Flow, you can do many things with the stream, before the values reach the FlowCollector. Just like with Rx, or with collections in Kotlin, you can transform the values using operators like map(), flatMap(), reduce() and much more. Additionally, you can use operators like debounce(), delayFlow() and delayEach() to apply backpressure or delays manually for each item, or the entire Flow.

GlobalScope.launch {
  flowOfStrings
      .map { it.split(" ") }
      .map { it.last() }
      .delayEach(100)
      .collect { value ->
        println(value)
      }
}

Switching the context

Another thing you can do with Flow events, is switch the context in which you’ll consume them. To do that, you have to call flowOn(context: CoroutineContext), just like this:

GlobalScope.launch {
  flowOfStrings
    .map { it.split(" ") }
    .map { it.last() }
    .flowOn(Dispatchers.IO)
    .delayEach(100)
    .flowOn(Dispatchers.Default)
    .collect { value ->
      println(value)
    }
}
/**
 * Changes the context where this flow is executed to 
 * the given [context]. This operator is composable 
 * and affects only preceding operators that do not have 
 * its own context.
 * This operator is context preserving: [context] **does not** 
 * leak into the downstream flow.
 ...
 **/

Flow Constraints

Since Flow is really easy to use as-is, there have to be some constraints in order to keep people from abusing or breaking the API. There are two main things which each Flow should adhere to, and each use case should enforce - preserving the context and being transparent with exceptions.

Preserving the Flow context

As mentioned above, you have to be clean when using CoroutineContexts with the Flow API. The producing and consuming contexts have to be the same. This effectively means that you cannot have concurrent value production, because the Flow itself is not thread safe, and doesn’t allow for such emmisions.

val flowOfStrings = flow {
  for (number in 0..100) {

    GlobalScope.launch {
      emit("Emitting: $number")
    }
  }
}

GlobalScope.launch {
  flowOfStrings.collect()
}
val flowOfStrings = channelFlow {
  for (number in 0..100) {

    withContext(Dispatchers.IO) {
      offer("Emitting: $number")
    }
  }
}

GlobalScope.launch {
  flowOfStrings.collect()
}

Being transparent with exceptions

When dealing with exceptions in coroutines, it’s relatively easy to bury them down. For example, by using async(), you could effectively receive an exception, but if you never call await(), you’re not going to throw it for the coroutines to catch. Additionally, if you add a CoroutineExceptionHandler, when exceptions occur in coroutines they get propagated to it, ending the coroutine.

flowOfStrings
  .map { it.split(" ") }
  .map { it[1] }
  .catch { it.printStackTrace() }
  .flowOn(Dispatchers.Default)
  .collect { println(it) }
val flowOfStrings = flow {
  emit("")

  for (number in 0..100) {
    emit("Emitting: $number")
  }
}
println("The code still works!")
flowOfStrings
.map { it.split(" ") }
.map { it[1] }
.catch {
  it.printStackTrace()
  // send the fallback value or values
  emit("Fallback")
}
.flowOn(Dispatchers.Default)
.collect { println(it) }

println("The code still works!")

Key Points

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.

Have feedback to share about the online reading experience? If you have feedback about the UI, UX, highlighting, or other features of our online readers, you can send them to the design team with the form below:

© 2021 Razeware LLC

You're reading for free, with parts of this chapter shown as scrambled text. Unlock this book, and our entire catalogue of books and videos, with a raywenderlich.com Professional subscription.

Unlock Now

To highlight or take notes, you’ll need to own this book in a subscription or purchased by itself.