Chapters

Hide chapters

Kotlin Coroutines by Tutorials

Second Edition · Android 10 · Kotlin 1.3 · Android Studio 3.5

Section I: Introduction to Coroutines

Section 1: 9 chapters
Show chapters Hide chapters

13. Producers & Actors
Written by Filip Babić

Heads up... You're reading this book for free, with parts of this chapter shown beyond this point as scrambled text.

Multi-processing communication is one of the most enticing challenges you can face when developing multi-threaded software. Multi-threading technologies have been around for years and there are a few solutions that are industry standard. In this chapter, you’ll be learning about two of those solutions, which can be really useful in your future applications.

Producing and consuming data

The first is the producer-consumer problem, which describes a two-process communication standard. One process produces data, places it in a queue, while the other picks items off of the queue, one by one, and consumes it. Hence the name. A problem arises if the consumer tries to pick data off an empty queue, or if the producer tries to overfill the queue. This is familiar to what you’ve learned so far in the book.

Furthermore, this pattern doesn’t have to describe a 1:1 relationship. One approach is for the producer to try and push as many events in the queue, and the system to consume them as fast as possible, using multiple consumers.

If you think about it, you can picture a thread pool the same way. You have one producer or worker, and threads, which are the consumers.

Producer-consumer problem

Just like with pipelines, producers face the same challenge. As previously mentioned, a full or an empty queue could cause loss of information, thread blocking or exceptions. On the other hand, creating producers with coroutines is much easier and avoids these problems. Let’s see how you’d create a basic producer using the Coroutines API.

Creating a producer

If you haven’t already, open up the starter project for this chapter with the name produce_actor. Then, open Produce.kt in the producer package in the project. Finally, to create a producer, call produce(), on a CoroutineScope, in main(), like so:

val producer = GlobalScope.produce<Int>(capacity = 10) {}
public fun <E> CoroutineScope.produce(
  context: CoroutineContext = EmptyCoroutineContext,
  capacity: Int = 0,
  @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>

Producing values

The producer has to produce some values. Since the return type of produce() is a ReceiveChannel, you can’t use it for sending values. You have to do it within the lambda you pass as parameter. The simplest way would be using a loop. Change the code in main(), in Produce.kt, to this:

val producer = GlobalScope.produce(capacity = 10) {
  while (isActive) {
    if (!isClosedForSend) {
      val number = Random.nextInt(0, 20)
      if (offer(number)) {
        println("$number sent")
      } else {
        println("$number discarded")
      }
    }
  }
}
val producer = GlobalScope.produce(capacity = 10) {
  while (isActive) {
    if (!isClosedForSend) {
      val number = Random.nextInt(0, 20)
      if (offer(number)) {
        println("$number sent")
      } else {
        println("$number discarded")
      }
    }
  }
}
Thread.sleep(30L)
3 sent
17 sent
2 sent
9 sent
15 sent
11 sent
16 sent
13 sent
10 sent
11 sent
9 discarded
8 discarded
. . .
val producer = GlobalScope.produce(capacity = 10) {
  while (isActive) {
    val number = Random.nextInt(0, 20)
    if (!isFull) {
      if (offer(number)) {
        println("$number sent")
      }
    } else {
      println("$number discarded")
    }
  }
}
Thread.sleep(30L)
val producer = GlobalScope.produce(capacity = 10) {
  while (isActive) {
    val number = Random.nextInt(0, 20)
    send(number)
    println("$number sent")
  }
}
Thread.sleep(30L)
11 sent
3 sent
16 sent
1 sent
11 sent
17 sent
15 sent
11 sent
3 sent
11 sent

Consuming the values

The produce function conveniently returns a ReceiveChannel. This means you can iterate through the values, transform or filter them, or run an infinite loop, reading the values.

while (!producer.isClosedForReceive) {
  val number = producer.poll()

  if (number != null) {
    println("$number received")
  }
}
12 sent
18 sent
19 sent
12 sent
17 sent
6 sent
9 sent
9 sent
1 sent
5 sent
12 received
3 sent
18 received
. . .
GlobalScope.launch {
  producer.consumeEach { println("$it received") }
}

Thread.sleep(30L)
GlobalScope.launch {
  while (isActive) {
    val value = producer.receive()
    println("$value received")
  }
}

Thread.sleep(30L)
GlobalScope.launch {
  for (value in producer) {
    println("$value received")
  }
}

Thread.sleep(30L)

Acting upon data

The actor model is a bit different from what you’ve seen so far and it exists as a possible solution for a very common problem: sharing data in a multithreading environment. When you create an instance of an object, you know you can interact with it using the operations it exposes: its interface. Most of the objects also have some state, which can change after interacting with other objects. Everything is simple if all the objects collaborate on the same thread. In a multithreading environment you have to introduce complexity in the code in order to make all the classes type safe. When multiple threads access shared data you know you can have problems like data race conditions. Locks, implicit or not, are a possible solution but are usually difficult to manage and test.

Actors interact using messages
Echupp uftayosz okomw jiljoviw

Handling actors properly

Having many actors at your disposal, being limited only by memory, is both a great thing and a challenge. The true challenge comes when you have to clean up old actors. If you hold a reference to your actor children, in each of the actors, you’d end up with a large reference tree. And you couldn’t clean up actors as you go, since they hold references to new, fresh actors, as well.

Building actors using coroutines

Note: The Jetbrains team is currently working on both the Flow API, and complex actors. They haven’t yet decided what to do with the current Actor API, and as such it’s been marked as obsolete. However, the API still works, so it’s worth checking it out.

public fun <E> CoroutineScope.actor(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null,
    block: suspend ActorScope<E>.() -> Unit
): SendChannel<E>
// 1
object completionHandler : CompletionHandler {
  override fun invoke(cause: Throwable?) {
    println("Completed!")
  }
}

fun main() {
  // 2
  val actor = GlobalScope.actor<String>(
      onCompletion = completionHandler,
      capacity = 10) {
    // 3
    for (data in channel) {
      println(data)
    }
  }

  // 4
  (1..10).forEach {
    actor.offer(Random.nextInt(0, 20).toString())
  }
  // 5
  actor.close()
  // 6
  Thread.sleep(500L)
}
5
4
19
10
8
15
5
6
0
8
Completed!

Delegating actor workload

The actor model, however, relies on delegating excess work to others. So, for example, if you’re building a robot-powered-storage system, where everything is organized by robots, you have to find a way to optimize the workload.

fun main() {

  val items = listOf(
      Package(1, "coffee"),
      Package(2, "chair"),
      Package(3, "sugar"),
      Package(4, "t-shirts"),
      Package(5, "pillowcases"),
      Package(6, "cellphones"),
      Package(7, "skateboard"),
      Package(8, "cactus plants"),
      Package(9, "lamps"),
      Package(10, "ice cream"),
      Package(11, "rubber duckies"),
      Package(12, "blankets"),
      Package(13, "glass")
  )

  val initialRobot = WarehouseRobot(1, items)

  initialRobot.organizeItems()
  Thread.sleep(5000)
}
class WarehouseRobot(private val id: Int,
                     private var packages: List<Package>) {

  companion object {
    private const val ROBOT_CAPACITY = 3
  }
...
}
 private fun processItems(items: List<Package>) {
    val actor = GlobalScope.actor<Package>(
    capacity = ROBOT_CAPACITY) {

      var hasProcessedItems = false

      while (!packages.isEmpty()) {
        val currentPackage = poll()

        currentPackage?.run {
          organize(this)

          packages -= currentPackage
          hasProcessedItems = true
        }

        if (hasProcessedItems && currentPackage == null) {
          cancel()
        }
      }
    }

    items.forEach { actor.offer(it) }
  }

  private fun organize(warehousePackage: Package) =
      println("Organized package " +
        "${warehousePackage.id}:" +
        warehousePackage.name)
fun organizeItems() {
  val itemsToProcess = packages.take(ROBOT_CAPACITY)
  val leftoverItems = packages.drop(ROBOT_CAPACITY)

  packages = itemsToProcess
  
  val packageIds = packages.map { it.id }
        .fold("") { acc, item -> "$acc$item " }

  processItems(itemsToProcess)

  if (leftoverItems.isNotEmpty()) {
    GlobalScope.launch {
      val helperRobot = WarehouseRobot(id.inc(), leftoverItems)

      helperRobot.organizeItems()
    }
  }

  println("Robot #$id processed following packages:$packageIds")
}
Organized package 1:coffee
Organized package 2:chair
Organized package 3:sugar
Robot #2 processed following packages:4 5 6 
Robot #1 processed following packages:1 2 3 
Organized package 7:skateboard
Organized package 8:cactus plants
Organized package 9:lamps
Robot #3 processed following packages:7 8 9 
Organized package 4:t-shirts
Organized package 5:pillowcases
Organized package 6:cellphones
Robot #4 processed following packages:10 11 12 
Organized package 10:ice cream
Organized package 11:rubber duckies
Organized package 12:blankets
Organized package 13:glass
Robot #5 processed following packages:13

Acting in parallel

Right now, the packages are mostly ordered, with a few exceptions to the rule. This means that, usually, once the first robot finishes its work, the helper robot starts on its packages. However, you’re building recursion-like work, which doesn’t suffer from the StackOverflowException or OutOfMemory exception, since the actors get cleaned up one by one.

fun organizeItems() {
  ...
  if (leftoverItems.isNotEmpty()) {
    GlobalScope.launch {
      val helperRobot = WarehouseRobot(id.inc(), leftoverItems)

      helperRobot.organizeItems()
    }
  }

  processItems(itemsToProcess)
  ...
}
Robot #3 processed following packages:7 8 9 
Robot #1 processed following packages:1 2 3 
Robot #5 processed following packages:13 
Robot #2 processed following packages:4 5 6 
Robot #4 processed following packages:10 11 12 
Organized package 10:ice cream
Organized package 4:t-shirts
Organized package 11:rubber duckies
Organized package 5:pillowcases
Organized package 1:coffee
Organized package 13:glass
Organized package 2:chair
Organized package 6:cellphones
Organized package 12:blankets
Organized package 3:sugar
Organized package 7:skateboard
Organized package 8:cactus plants
Organized package 9:lamps

Key points

  • Produce-consumer pattern and the actor model are tried and tested mechanisms for multi-threading.
  • Producer-consumer relationships are one-to-many, where you can consume the events from multiple places.
  • The actor model is a way to share data in a multithread environment using a dedicated queue.
  • The actor model allows you to offload large amounts of work to many smaller constructs.
  • Actors have a many-to-one relationship, since you can send events from multiple places, but they all end up in one actor.
  • Each actor can create new actors, delegating and offloading work.
  • Building actors using threads can be expensive, which is where coroutines come in handy.
  • Actors can be arranged to run in sequential order, or to run in parallel.

Where to go from here?

You now know how to build effective communication mechanisms, using produce() and actor(). You’ve got everything you need to connect multiple threads, or to fan out a large workload. In the next few chapters, you’ll see how to build a different kind of mechanism of communication like broadcasting. You’ll also see how channels’ data can be transformed and combined.

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2024 Kodeco Inc.

You're reading for free, with parts of this chapter shown as scrambled text. Unlock this book, and our entire catalogue of books and videos, with a Kodeco Personal Plan.

Unlock now