11
Channels
Written by Nishant Srivastava
From the previous chapters, you already learned how to deal with sending a request for receiving a single value. This approach works perfectly when you just need to get a value once and show it to the user, e.g., fetching a user profile or downloading an image. In this chapter, you will learn how to send and receive streams of values.
Streams are convenient when you need to continuously get updates of data or handle a potentially infinite sequence of items. Kotlin isn’t the first one to offer a solution to these problems. Observable
from ReactiveX and Queue
from Java solve them as well. How Channels
compare with Observable
and Queue
, as well as their benefits and disadvantages, will be covered further in this book.
Note: A stream is a source or repository of data that can be read or written only sequentially while a thread is a unit of execution, lighter in weight than a process, generally expected to share memory and other resources with other threads executed concurrently.
Getting started with channels
Channels are conceptually similar to reactive streams. It is a simple abstraction that you can use to transfer a stream of values between coroutines. Consider a source that sends content to a destination that receives it; i.e., elements are sent into the channel by producer coroutines and are received by consumer coroutines. Essentially, channels are like blocking queues that send and operate on data asynchronously.
A fundamental property — and an important concept to understand — of a channel is its capacity, which defines the maximum number of elements that a channel can contain in a buffer. Suppose you have a channel with capacity N. A producer can send values into the channel but, when the channel reaches its N capacity, the producer suspends until a consumer starts to read data from the same channel. You can think of the capacity like the size of the buffer for a specific channel; it’s a way to optimize performances in the case producing and consuming are operations, which take different amounts of time.
You can change the default capacity of a channel by passing it as an argument to its factory method. Take a look at the following method signature:
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E>
You will notice that the default capacity is set to RENDEZVOUS, which corresponds to 0 as per the source code:
public const val RENDEZVOUS = 0
What does it mean in practice? It means that the producer channel won’t produce anything until there is a consumer channel that needs data; essentially, there is no buffer.
An element is transferred from producer to consumer only when the producer’s send
and consumer’s receive
invocations meet in time (rendezvous). Because of this, the send
function suspends until another coroutine invokes receive
and receive
suspends until another coroutine invokes send
. This is the reason for the RENDEZVOUS name.
Note: The same happens in Java with the SynchronousQueue class.
Creating a channel is pretty straightforward. Write the following:
val kotlinChannel = Channel<Int>()
Consuming its values can be done via the usual for
loop:
for (x in kotlinChannel){
println(x)
}
Channels implement the SendChannel
and ReceiveChannel
interfaces.
public interface SendChannel<in E> {
@ExperimentalCoroutinesApi
public val isClosedForSend: Boolean
public suspend fun send(element: E)
public fun offer(element: E)
public fun close(cause: Throwable? = null): Boolean
...
}
public interface ReceiveChannel<out E> {
@ExperimentalCoroutinesApi
public val isClosedForReceive: Boolean
public suspend fun receive(): E
public fun cancel(): Unit
...
}
Notice that SendChannel exposes the operation close, which is used — surprise — for closing the channel. As soon as the sender calls close()
on the channel, the value of isClosedForSend
becomes true
.
Note:
close()
is an idempotent operation; repeated invocations of this function have no effect and returnfalse
.
You can’t send any message into a closed channel. Closing a channel conceptually works by sending a special close token over it. You close a channel when you have a finite sequence of elements to be processed by consumers. You must then signal to the consumers that this sequence is over. The iteration stops as soon as this close token is received, so there is a guarantee that all previously sent elements before the close are received. You don’t have to close a channel otherwise.
On the other hand, ReceiveChannel exposes the cancel operation, which cancels the reception of remaining elements from the channel. Once finished, this function closes the channel and removes all messages in the buffer, if any. After cancel()
completes, isClosedForReceive
starts returning true
. If the producer has already closed the channel invoking the close() function, then isClosedForReceive
returns true only after all previously sent elements are received.
The isClosedForReceive property can be used along with channel.receive()
to iterate and get items from a channel one at a time:
while (!kotlinChannel.isClosedForReceive) {
val value = kotlinChannel.receive()
println(value)
}
Channels are not tied to any native resource and they don’t have to be closed to release their memory; hence, simply dropping all the references to a channel is fine. When the garbage collector runs, it will clean out those references.
Other important methods are send and receive. You can send items to the channel with the method send(element: E)
and receive from it with receive():E
.
This is typical usage for a channel:
fun main() {
// 1
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes",
"Strawberry")
// 2
val kotlinChannel = Channel<String>()
runBlocking {
// 3
GlobalScope.launch {
for (fruit in fruitArray) {
// 4
kotlinChannel.send(fruit)
// 5
if (fruit == "Pear") {
// 6
kotlinChannel.close()
}
}
}
// 7
for (fruit in kotlinChannel) {
println(fruit)
}
// 8
println("Done!")
}
}
Output:
Apple
Banana
Pear
Done!
Breaking down each part of the above code snippet, which you can find the executable version of the above snippet of code in the starter project in the file called ChannelsIntro.kt:
- An array of string items.
- Create a channel with default — i.e., 0 capacity.
- Set up the producer.
- Send data in the channel.
- Conditional check, if the current item is equal to value
Pear
. - Signal the closure of the channel via calling
close()
on the channel. - Set up the consumer that is printing the received values using
for
loop (until the channel is closed). - Print the final
Done
status.
In the previous example, you create a Channel
of String
objects. Then, into the body of the launch
coroutine builder, you iterate over an Array<String>
and put each element into the channel using the send
function. While iterating, you check if the current value equals to Pear
, in which case you close the channel invoking the close method. This is an example of a condition for the closing of the channel.
On the receiving side, you use a normal iteration with a for cycle in order to consume all the elements available in the channel. The for
cycle is smart enough to understand when the channel is closed because it uses the underlying Iterator
.
The for
loop solution is excellent because it allows you to use channels in the normal pattern that you’d use for iterating over a normal collection. If you want more control over what you’re doing, you can consume the channel using code like this:
while (!kotlinChannel.isClosedForReceive) {
val value = kotlinChannel.receive()
println(value)
}
However, there is yet another way to iterate over the channel values, via using repeat()
Kotlin construct:
// Another way to iterate over the channel values
// You use channel.receive() to
// get the messages one by one
repeat(3){
val fruit = kotlinChannel.receive()
println(fruit)
}
Here, you explicitly use the receive
method but you have to know exactly how many elements you’re getting from the channel, which is not always possible. If you try to put 4 instead of 3 as argument of the repeat
function, you’ll have a ClosedReceiveChannelException
exception like this:
Apple
Banana
Pear
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
at kotlinx.coroutines.channels.Closed.getReceiveException(AbstractChannel.kt:1070)
It’s interesting to note that the exception is not thrown on the receive
function but on the close one. This happens because the close
function is a suspend
function, which actually completes only when the receiver consumes all the items in the channel. If the receiver requests more items that the one available, the producer tries to provide some new data. But, in this, case the channel is closed and this is not possible. This is the reason for the ClosedReceiveChannelException. In the case that you put a value smaller than the number of available objects, on the other hand, you’re going to miss some data.
Understanding closed channels
In order to understand the state of the channel, you can use two handy properties: isClosedForReceive and isClosedForSend.
while (!kotlinChannel.isClosedForReceive) {
val fruit = kotlinChannel.receive()
println(fruit)
}
while (!kotlinChannel.isClosedForReceive) {
val fruit = kotlinChannel.receive()
delay(10)
println(fruit)
}
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")
fun produceFruits() = GlobalScope.produce<String> {
for (fruit in fruitArray) {
send(fruit)
// Conditional close
if (fruit == "Pear") {
// Signal that closure of channel
close()
}
}
}
runBlocking {
val fruits = produceFruits()
fruits.consumeEach { println(it) }
println("Done!")
}
}
Pipelines
With channels, you always have a producer and a consumer. Sometimes, a consumer receives the data from a channel, applies some transformations and becomes the producer of a new channel. When a consumer of a channel becomes the producer of another channel, you create a Pipelines. The source channel might be infinite and the pipeline might contain different steps.
data class Fruit(override val name: String, override val color: String) : Item
data class Vegetable(override val name: String, override val color: String) : Item
@ExperimentalCoroutinesApi
fun main() {
// ------------ Helper Methods ------------
fun isFruit(item: Item): Boolean = item is Fruit
fun isRed(item: Item): Boolean = (item.color == "Red")
// ------------ Pipeline ------------
// 1
fun produceItems() = GlobalScope.produce {
val itemsArray = ArrayList<Item>()
itemsArray.add(Fruit("Apple", "Red"))
itemsArray.add(Vegetable("Zucchini", "Green"))
itemsArray.add(Fruit("Grapes", "Green"))
itemsArray.add(Vegetable("Radishes", "Red"))
itemsArray.add(Fruit("Banana", "Yellow"))
itemsArray.add(Fruit("Cherries", "Red"))
itemsArray.add(Vegetable("Broccoli ", "Green"))
itemsArray.add(Fruit("Strawberry", "Red"))
// Send each item in the channel
itemsArray.forEach {
send(it)
}
}
// 2
fun isFruit(items: ReceiveChannel<Item>) = GlobalScope.produce {
for (item in items) {
// Send each item in the channel only if it is a fruit
if (isFruit(item)) {
send(item)
}
}
}
// 3
fun isRed(items: ReceiveChannel<Item>) = GlobalScope.produce {
for (item in items) {
// Send each item in the channel only if it is red in color
if (isRed(item)) {
send(item)
}
}
}
runBlocking {
// 4
val itemsChannel = produceItems()
// 5
val fruitsChannel = isFruit(itemsChannel)
// 6
val redChannel = isRed(fruitsChannel)
// 7
for (item in redChannel) {
print("${item.name}, ")
}
// 8
redChannel.cancel()
fruitsChannel.cancel()
itemsChannel.cancel()
// 9
println("Done!")
}
}
interface Item {
val name: String
val color: String
}
Apple, Cherries, Strawberry, Done!
Fan out
In the previous example, you created a pipeline as a sequence of channels, each one with a single producer and a single consumer. Coroutines were consuming the data from a channel and testing if that data satisfied certain conditions. In the case of success, the items were put into the new channel; otherwise, they were discarded.
typealias Predicate<E> = (E) -> Boolean
typealias Rule<E> = Pair<Channel<E>, Predicate<E>>
class Demultiplexer<E>(vararg val rules: Rule<E>) {
suspend fun consume(recv: ReceiveChannel<E>) {
for (item in recv) {
// 1
for (rule in rules) {
// 2
if (rule.second(item)) {
// 3
rule.first.send(item)
}
}
}
// 4
closeAll()
}
// Closes all the demultiplexed channels
private fun closeAll() {
rules.forEach { it.first.close() }
}
}
@ExperimentalCoroutinesApi
fun main() {
data class Fruit(override val name: String, override val color: String) : Item
data class Vegetable(override val name: String, override val color: String) : Item
// ------------ Helper Methods ------------
fun isFruit(item: Item) = item is Fruit
fun isVegetable(item: Item) = item is Vegetable
// 1
fun produceItems(): ArrayList<Item> {
val itemsArray = ArrayList<Item>()
itemsArray.add(Fruit("Apple", "Red"))
itemsArray.add(Vegetable("Zucchini", "Green"))
itemsArray.add(Fruit("Grapes", "Green"))
itemsArray.add(Vegetable("Radishes", "Red"))
itemsArray.add(Fruit("Banana", "Yellow"))
itemsArray.add(Fruit("Cherries", "Red"))
itemsArray.add(Vegetable("Broccoli", "Green"))
itemsArray.add(Fruit("Strawberry", "Red"))
itemsArray.add(Vegetable("Red bell pepper", "Red"))
return itemsArray
}
runBlocking {
// 2
val kotlinChannel = Channel<Item>()
// 3
val fruitsChannel = Channel<Item>()
val vegetablesChannel = Channel<Item>()
// 4
launch {
produceItems().forEach {
kotlinChannel.send(it)
}
// 5
kotlinChannel.close()
}
// 6
val typeDemultiplexer = Demultiplexer(
fruitsChannel to { item: Item -> isFruit(item) },
vegetablesChannel to { item: Item -> isVegetable(item) }
)
// 7
launch {
typeDemultiplexer.consume(kotlinChannel)
}
// 8
launch {
for (item in fruitsChannel) {
// Consume fruitsChannel
println("${item.name} is a fruit")
}
}
// 9
launch {
for (item in vegetablesChannel) {
// Consume vegetablesChannel
println("${item.name} is a vegetable")
}
}
}
}
Apple is a fruit
Zucchini is a vegetable
Grapes is a fruit
Radishes is a vegetable
Banana is a fruit
Cherries is a fruit
Broccoli is a vegetable
Strawberry is a fruit
Red bell pepper is a vegetable
Fan in
In the previous example, you created a coroutine that was able to demultiplex the items into different channels based on certain criteria. That was a way to simulate the case in which you have one producer and many consumers.
@ExperimentalCoroutinesApi
fun main() {
data class Fruit(override val name: String, override val color: String) : Item
data class Vegetable(override val name: String, override val color: String) : Item
// ------------ Helper Methods ------------
fun isFruit(item: Item) = item is Fruit
fun isVegetable(item: Item) = item is Vegetable
// 1
fun produceItems(): ArrayList<Item> {
val itemsArray = ArrayList<Item>()
itemsArray.add(Fruit("Apple", "Red"))
itemsArray.add(Vegetable("Zucchini", "Green"))
itemsArray.add(Fruit("Grapes", "Green"))
itemsArray.add(Vegetable("Radishes", "Red"))
itemsArray.add(Fruit("Banana", "Yellow"))
itemsArray.add(Fruit("Cherries", "Red"))
itemsArray.add(Vegetable("Broccoli", "Green"))
itemsArray.add(Fruit("Strawberry", "Red"))
itemsArray.add(Vegetable("Red bell pepper", "Red"))
return itemsArray
}
runBlocking {
// 2
val destinationChannel = Channel<Item>()
// 3
val fruitsChannel = Channel<Item>()
val vegetablesChannel = Channel<Item>()
// 4
launch {
produceItems().forEach {
if (isFruit(it)) {
fruitsChannel.send(it)
}
}
}
// 5
launch {
produceItems().forEach {
if (isVegetable(it)) {
vegetablesChannel.send(it)
}
}
}
// 6
launch {
for (item in fruitsChannel) {
destinationChannel.send(item)
}
}
// 7
launch {
for (item in vegetablesChannel) {
destinationChannel.send(item)
}
}
// 8
destinationChannel.consumeEach {
if (isFruit(it)) {
println("${it.name} is a fruit")
} else if (isVegetable(it)) {
println("${it.name} is a vegetable")
}
}
// 9
coroutineContext.cancelChildren()
}
}
Apple is a fruit
Zucchini is a vegetable
Grapes is a fruit
Banana is a fruit
Radishes is a vegetable
Cherries is a fruit
Broccoli is a vegetable
Strawberry is a fruit
Red bell pepper is a vegetable
Buffered channel
As you might have noticed above, the channel examples demonstrated previously used a default value for the capacity, called RENDEZVOUS. These kinds of channels are called unbuffered channels because the producer produces only if there’s a consumer ready to consume.
// Channel of capacity 2
val kotlinBufferedChannel = Channel<String>(2)
fun main() {
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")
val kotlinBufferedChannel = Channel<String>(2)
runBlocking {
launch {
for (fruit in fruitArray) {
kotlinBufferedChannel.send(fruit)
println("Produced: $fruit")
}
kotlinBufferedChannel.close()
}
launch {
for (fruit in kotlinBufferedChannel) {
println("Consumed: $fruit")
delay(1000)
}
}
}
}
Produced: Apple
Produced: Banana
Consumed: Apple
Produced: Pear
Consumed: Banana
Produced: Grapes
Consumed: Pear
Produced: Strawberry
Consumed: Grapes
Consumed: Strawberry
Comparing send
and offer
In the previous examples, you sent values into a channel using the send
function. Depending on the channel’s capacity, send
is a function that can suspend. This is happening when the channel’s buffer is full or, in case of RENDEZVOUS, when there’s not receiver ready to consume.
abstract fun offer(element: E): Boolean
fun main() {
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")
val kotlinChannel = Channel<String>()
runBlocking {
launch {
for (fruit in fruitArray) {
val wasSent = kotlinChannel.offer(fruit)
if (wasSent) {
println("Sent: $fruit")
} else {
println("$fruit wasn’t sent")
}
}
kotlinChannel.close()
}
for (fruit in kotlinChannel) {
println("Received: $fruit")
}
println("Done!")
}
}
Sent: Apple
Banana wasn’t sent
Pear wasn’t sent
Grapes wasn’t sent
Strawberry wasn’t sent
Received: Apple
Comparing receive
and poll
In the previous section, you’ve seen that a producer can use offer
as a not suspending version of the send
function. What about the consumer? In this case, the version of receive
without suspending is the poll
function whose signature is:
abstract fun poll(): E?
fun main() {
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")
val kotlinChannel = Channel<String>()
runBlocking {
launch {
for (fruit in fruitArray) {
if (fruit == "Pear") {
break
}
kotlinChannel.send(fruit)
println("Sent: $fruit")
}
}
launch {
repeat(fruitArray.size) {
val fruit = kotlinChannel.poll()
if (fruit != null) {
println("Received: $fruit")
} else {
println("Channel is empty")
}
delay(500)
}
println("Done!")
}
}
}
Received: Apple
Sent: Apple
Received: Banana
Sent: Banana
Channel is empty
Channel is empty
Channel is empty
Done!
Error handling
As you have seen in the previous examples, exceptions play an important role in the way you can use a channel. It’s crucial to understand what the main exceptions are and what you should do when they happen. You have to consider two main use cases, depending on if you’re on the producer side or on the consumer side of the channel.
fun main() {
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")
val kotlinChannel = Channel<String>()
runBlocking {
launch {
for (fruit in fruitArray) {
// Conditional close
if (fruit == "Grapes") {
// Signal that closure of channel
kotlinChannel.close()
}
kotlinChannel.send(fruit)
}
}
repeat(fruitArray.size) {
try {
val fruit = kotlinChannel.receive()
println(fruit)
} catch (e: Exception) {
println("Exception raised: ${e.javaClass.simpleName}")
}
}
println("Done!")
}
}
Apple
Banana
Pear
Exception raised: ClosedReceiveChannelException
Exception raised: ClosedReceiveChannelException
Done!
fun main() {
val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes", "Strawberry")
val kotlinChannel = Channel<String>()
runBlocking {
launch {
for (fruit in fruitArray) {
try {
kotlinChannel.send(fruit)
} catch (e: Exception) {
println("Exception raised: ${e.javaClass.simpleName}")
}
}
println("Done!")
}
repeat(fruitArray.size - 1) {
val fruit = kotlinChannel.receive()
// Conditional close
if (fruit == "Grapes") {
// Signal that closure of channel
kotlinChannel.close()
}
println(fruit)
}
}
}
Apple
Banana
Pear
Grapes
Exception raised: ClosedSendChannelException
Done!
Comparing Channels to Java Queues
As mentioned, Java offers a similar solution for handling streams, called Queue<E>
, which is an interface and has several implementations. Take a look at an implementation of the BlockingQueue<E>
interface, as it supports a similar behavior as Channel
of waiting until a queue has space before inserting an element.
public class BlockingQueueExample {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
System.out.println("Beginning:");
try {
System.out.println("Let’s put in basket: Apple");
queue.put("Apple");
System.out.println("Let’s put in basket: Banana");
queue.put("Banana");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Done!");
}
}
Beginning:
Let’s put in basket: Apple
Let’s put in basket: Banana
Done!
fun main(args: Array<String>) {
// 1
val queue = LinkedBlockingQueue<Int>()
runBlocking {
// 2
launch {
(1..5).forEach {
queue.put(it)
yield()
println("Produced ${it}")
}
}
// 3
launch {
while (true) {
println("Consumed ${queue.take()}")
yield()
}
}
println("Done!")
}
}
Consumed 1
Produced 1
Consumed 2
Produced 2
Consumed 3
Produced 3
Consumed 4
Produced 4
Consumed 5
Produced 5
Key points
- Channels provide the functionality for sending and receiving streams of values.
-
Channel
implements bothSendChannel
andReceiveChannel
interfaces; therefore, it could be used for sending and receiving streams of values. - A Channel can be closed. When that happens, you can’t send or receive an element from it.
- The
send()
method either adds the value to a channel or suspends the coroutine until there is space in the channel. - The
receive()
method returns a value from a channel if it is available, or it suspends the coroutine until some value is available otherwise. - The
offer()
method can be used as an alternative tosend()
. Unlike thesend()
method,offer()
doesn’t suspend the coroutine, it returnsfalse
instead. It returnstrue
in case of a successful operation. -
poll()
similarly tooffer()
doesn’t suspend the running, but returnsnull
if a channel is empty. - Java
BlockingQueue
has a similar to KotlinChannel
behavior, the main difference is that the current thread gets blocked if the operation of inserting or retrieving is unavailable at the moment.