17
Sequence & Flow
Written by Massimo Carli
Heads up... You're reading this book for free, with parts of this chapter shown beyond this point as
text.You can unlock the rest of this book, and our entire catalogue of books and videos, with a raywenderlich.com Professional subscription.
In Chapter 16, “Handling Side Effects”, you learned how to use the IO<T>
monad as a special case of the State<S, T>
data type. You also learned that Kotlin provides coroutines to handle side effects as pure functions in a more idiomatic way. In this chapter, you’ll learn everything you need to know about the following special Kotlin data types:
Sequence<T>
Flow<T>
You’ll also have a quick overview of SharedFlow<T>
and StateFlow<T>
.
You’ll learn how these data types work from a functional programming point of view. In particular, you’ll answer the following questions for each of these:
- What is the context they provide?
- Are they a functor?
- Are they an applicative functor?
- Are they a monad?
Note: If you don’t know coroutines yet, Kotlin Coroutines by Tutorials is the book for you.
This chapter is a big exercise that helps you take the concepts you’ve learned so far and apply them to the types you use every day in your job. It’s time to have fun!
The Sequence<T> data type
In Chapter 9, “Data Types”, you learned that List<T>
is a data type with the functor and monad superpowers with the map
and flatMap
functions. In Chapter 4, “Expression Evaluation, Laziness & More About Functions”, you also learned that laziness is one of the main characteristics of functional programming. To remind you what this means, open ListDataType.kt in this chapter’s material and write the following code:
fun main() {
listOf(1, 2, 3, 4, 5) // 1
.filter(filterOdd.logged("filterOdd")) // 2
.map(double.logged("double")) // 3
}
In this code, you:
- Use
listOf
as a builder for aList<Int>
with five elements of typeInt
. - Invoke
filter
, passing the reference to the logged version offilterOdd
. - Use
map
to transform the filter’s values using a logged version ofdouble
.
Note:
filterOdd
anddouble
are two very simple functions you find in Util.kt in the lib sub-package.logged
is a utility higher-order function that decorates another function with a log message. Take a look at their simple implementation, if you want.
The interesting fact about the previous code happens when you run it, getting the following output:
filterOdd(1) = false
filterOdd(2) = true
filterOdd(3) = false
filterOdd(4) = true
filterOdd(5) = false
double(2) = 4
double(4) = 8
This happens because, in each line, you:
- Create a
List<Int>
with five elements. - Invoke
filter
, which returns anotherList<Int>
containing only the even values. It’s crucial to see thatfilterOdd
has been invoked for all the elements of the originalList<Int>
. - Use
map
, getting a newList<Int>
with the double of the values in the previousList<Int>
.
With this code, you basically created three lists without using any of the individual lists’ values. What happens if you don’t really need the values in the List<Int>
? In this case, you started with a List<Int>
of five elements. What if the list has a lot more elements? What if the elements in the List<T>
are infinite?
Well, you don’t have to blame the List<T>
data type because its job is to contain an ordered collection of elements of type T
. That’s why it’s been created that way. That’s its context, or purpose, if you will. Another way to say it is that List<T>
is eager.
If you don’t want to keep all the possible values in a List<T>
, Kotlin provides the Sequence<T>
data type.
Open SequenceDataType.kt and write the following code:
fun main() {
sequenceOf(1, 2, 3, 4, 5) // HERE
.filter(filterOdd.logged("filterOddSeq"))
.map(double.logged("doubleSeq"))
}
This code differs from the previous one because of the use of sequenceOf
instead of listOf
. More importantly, if you run the code, you’ll get nothing as output. This is because Sequence<T>
is lazy. If you want to actually consume the values in the Sequence<Int>
you just created, you need to consume them using a terminal operator. To see how, add .count()
to the end of the method chain. It should now look like this:
fun main() {
sequenceOf(1, 2, 3, 4, 5)
.filter(filterOdd.logged("filterOddSeq"))
.map(double.logged("doubleSeq"))
.count() // HERE
}
Here, you’re just counting the elements in the sequence and, to do it, you need to consume all of them. This time, running the code, you’ll get the following:
filterOddSeq(1) = false
filterOddSeq(2) = true
doubleSeq(2) = 4
filterOddSeq(3) = false
filterOddSeq(4) = true
doubleSeq(4) = 8
filterOddSeq(5) = false
Note how the order of the log messages is different from the one you got from the List<T>
. In that case, each operator read the values from the input List<T>
. Now, the chain of operators is called for each value you consume.
Note: If you’re curious and want to look at the definition of
Sequence<T>
, you’ll find that it differs from theIterable<T>
interface in the use of theoperator
keyword, which allows its use in an enhanced form.
This clarifies the context for a Sequence<T>
as a container that produces the values it contains only when required. That means it’s lazy. But is Sequence<T>
a functor?
Sequence<T> as a functor
Looking at the Sequence<T>
documentation, you see the definition of map
with the following signature:
public fun <T, R> Sequence<T>.map(transform: (T) -> R): Sequence<R>
fun interface Generator<T> {
fun generate(n: Int): List<T>
}
fun <T, R> Generator<T>.map(fn: (T) -> R): Generator<R> = object : Generator<R> {
override fun generate(n: Int): List<R> = this@map.generate(n).map(fn)
}
fun <A, B> funGenerator(bGen: Generator<B>): Generator<Fun<A, B>> =
bGen.map { b: B -> { b } }
@Test
fun `Identity Functor Law for Sequences`() {
val intToStringFunGenerator =
funGenerator<Int, String>(StringGenerator(5)) // 1
val i = { s: String -> s } // 2
100.times { // 3
val f = intToStringFunGenerator.one() // 4
val seq = IntGenerator.generate(5).asSequence() // 5
val list1 = seq.map(f compose i).toList() // 6
val list2 = seq.map(f).toList() // 7
Truth.assertThat(list1).isEqualTo(list2) // 8
}
}
@Test
fun `Composition Functor Law for Sequences`() {
val intToStringFunGenerator =
funGenerator<Int, String>(StringGenerator(5))
val stringToLongFunGenerator =
funGenerator<String, Long>(LongGenerator) // 1
100.times {
val f = intToStringFunGenerator.one()
val g = stringToLongFunGenerator.one() // 2
val seq = IntGenerator.generate(5).asSequence()
val list1 = seq.map(f compose g).toList() // 3
val list2 = seq.map(f).map(g).toList() // 4
Truth.assertThat(list1).isEqualTo(list2) // 5
}
}
Sequence<T> as an applicative functor
You just proved that the Sequence<T>
data type is a functor because of the existing implementation of map
. But what about applicative functors? Looking at the Kotlin documentation, you don’t see any higher-order functions like your ap
and app
. No problem — you can do this!
fun <A, B> Sequence<A>.ap(
fn: Sequence<(A) -> B>
): Sequence<B> = TODO()
fun <A, B> Sequence<A>.ap(fn: Sequence<(A) -> B>): Sequence<B> =
sequence { // 1
val iterator = iterator() // 2
while (iterator.hasNext()) {
val fnIterator = fn.iterator() // 3
val item = iterator.next()
while (fnIterator.hasNext()) {
yield(fnIterator.next().invoke(item)) // 4
}
}
}
infix fun <A, B> Sequence<(A) -> B>.appl(a: Sequence<A>) = a.ap(this)
fun main() {
data class User( // 1
val id: Int,
val name: String,
val email: String
)
val userBuilder = ::User.curry() // 2
val userBuilderSeq = sequenceOf(userBuilder) // 3
val idSeq = sequenceOf(10, 20, 30) // 4
val nameSeq = sequenceOf("Minnie", "Donald", "Mickey") // 4
val emailSeq =
sequenceOf("aaaaaa@aaaaa.com", "bbbbb@bbbbbb.com") // 4
val userSeq =
userBuilderSeq appl idSeq appl nameSeq appl emailSeq // 5
userSeq.forEach(::println) // 6
}
User(id=10, name=Minnie, email=aaaaaa@aaaaa.com)
User(id=20, name=Minnie, email=aaaaaa@aaaaa.com)
// ...
User(id=20, name=Mickey, email=bbbbb@bbbbbb.com)
User(id=30, name=Mickey, email=bbbbb@bbbbbb.com)
Sequence<T> as a monad
Is Sequence<T>
finally a monad? Of course it is, because of the flatMap
operation that Kotlin APIs provide with the following signature, similar to the Kleisli category:
fun <T, R> Sequence<T>.flatMap(
transform: (T) -> Sequence<R>
): Sequence<R>
fun main() {
// ...
val seqTo = { n: Int -> (1..n).toList().asSequence() }
val seqOfSeq = sequenceOf(1, 2, 3, 4, 5).flatMap(seqTo)
seqOfSeq.forEach { print("$it ") }
}
1 1 2 1 2 3 1 2 3 4 1 2 3 4 5
The Flow<T> data type
In Chapter 16, “Handling Side Effects”, you learned that Kotlin allows you to achieve with suspendable functions what you can do with the IO<T>
monad. In this chapter, you’ve already learned how to produce a theoretically infinite sequence of values in a lazy way. If the values you want to generate are the result of a suspendable function, the Flow<T>
data type is what you need.
Flow<T> as a functor
To prove that Flow<T>
is a functor, you could repeat the same process you did for Sequence<T>
using property-based testing. In this case, you’ll keep things easier, implementing some practical examples.
fun inputStringFlow(question: String = "") = flow { // 1
val scanner = java.util.Scanner(System.`in`) // 2
print(question) // 3
while (scanner.hasNextLine()) { // 4
val line = scanner.nextLine() // 4
if (line.isNullOrEmpty()) { // 5
break
}
emit(line) // 6
print(question) // 3
}
scanner.close() // 7
}
fun main() {
val strLengthFlow = inputStringFlow("Insert a word: ") // 1
.map { str -> // 2
str to str.length
}
runBlocking { // 3
strLengthFlow.collect { strInfo -> // 4
println("${strInfo.first} has length ${strInfo.second}")
}
}
}
Flow<T> as an applicative functor
To see if the Flow<T>
also behaves as an applicative functor, either repeat what you did for the Sequence<T>
or just follow along. In FlowDataType.kt, add the following code:
fun <A, B> Flow<A>.ap(fn: Flow<(A) -> B>): Flow<B> = flow { // 1
collect { a -> // 2
fn.collect { f -> // 3
emit(f(a)) // 4
}
}
}
infix fun <A, B> Flow<(A) -> B>.appl(
a: Flow<A>
) = a.ap(this) // 5
fun main() {
val userBuilder = { id: Int ->
{ name: String ->
{ email: String -> User(id, name, email) }
}
}
val userBuilderFlow = flowOf(userBuilder)
val idFlow = listOf(10, 20, 30).asFlow()
val nameFlow = listOf("Pippo", "Pippo2", "Pippo3").asFlow()
val emailFlow = listOf(
"pippo@pippo.com", "pippo2@pippo.com", "pippo3@pippo.com"
).asFlow()
val userFlow =
userBuilderFlow appl idFlow appl nameFlow appl emailFlow
runBlocking {
userFlow.collect(::println)
}
}
User(id=10, name=Pippo, email=pippo@pippo.com)
User(id=20, name=Pippo, email=pippo@pippo.com)
// ...
User(id=20, name=Pippo3, email=pippo3@pippo.com)
User(id=30, name=Pippo3, email=pippo3@pippo.com)
Flow<T> as a monad
To answer the last question, you’ll implement a more complex example using some of the code you already implemented in Chapter 14, “Error Handling With Functional Programming”, that you can find in the material for this project. You basically want to use inputStringFlow
to allow a user to insert some text to search for in the TV show database using the TVmaze API. Now, you’re in the world of coroutines, so you should use their power. It’s time for an interesting exercise to improve your functional thinking.
fun doSomeWork(name: String): Int = 10
suspend fun doSomeBgWork(
ctx: CoroutineContext,
name: String
): Int = withContext(ctx) {
doSomeWork(name)
}
fun main() {
doSomeBgWork.curry()
}
typealias SuspendFun<A, B> = suspend (A) -> B // 1
typealias SuspendFun2<A, B, C> = suspend (A, B) -> C // 2
typealias SuspendChain2<A, B, C> =
suspend (A) -> suspend (B) -> C // 3
fun <A, B, C> SuspendFun2<A, B, C>.curry(): SuspendChain2<A, B, C> =
{ a: A ->
{ b: B ->
this(a, b)
}
}
CoroutineContext as a state
To relate the State<S, T>
monad to what you saw about suspendable functions, look again at doSomeBgWork
, which you wrote in Basic.kt:
suspend fun doSomeBgWork(ctx: CoroutineContext, name: String): Int =
withContext(ctx) {
doSomeWork(name)
}
suspend fun doSomeMoreBgWork(
ctx: CoroutineContext,
name: String
): Pair<CoroutineContext, Int> = withContext(ctx) {
ctx to doSomeWork(name)
}
typealias SuspendStateTransformer<S, T> =
suspend (S) -> Pair<S, T>
data class SuspendableState<S, T>(
val sst: SuspendStateTransformer<S, T>
) {
companion object {
@JvmStatic
fun <S, T> lift(
value: T
): SuspendableState<S, T> =
SuspendableState { state -> state to value }
}
}
fun <S, A, B> SuspendableState<S, A>.map(
fn: SuspendFun<A, B>
): SuspendableState<S, B> =
SuspendableState { s0: S ->
val (s1, a) = this.sst(s0)
s1 to fn(a)
}
fun <S, A, B> SuspendableState<S, A>.flatMap(
fn: suspend (A) -> SuspendableState<S, B>
): SuspendableState<S, B> =
SuspendableState { s0: S ->
val (s1, a) = this.sst(s0)
fn(a).sst(s1)
}
Back to the TV show
In the previous section, you created the SuspendableState<S, T>
data type and implemented lift
, map
and flatMap
. How can you use these for getting data about a TV show? In the tools sub-package in this chapter’s material, you find TvShowFetcher
and TvShowParser
for, respectively, fetching and parsing data using the TVmaze API.
suspend fun fetchTvShowResult( // 1
ctx: CoroutineContext,
query: String
): Result<String> = // 2
withContext(ctx) { // 3
try {
Result.success(TvShowFetcher.fetch(query)) // 4
} catch (ioe: IOException) {
Result.failure(ioe) // 5
}
}
suspend fun parseTvShowResult(
ctx: CoroutineContext,
json: String
): Result<List<ScoredShow>> =
withContext(ctx) {
try {
Result.success(TvShowParser.parse(json))
} catch (e: Exception) {
Result.failure(e)
}
}
val fetchSuspend: (String) -> SuspendableState<
CoroutineContext, Result<String>> = { query ->
SuspendableState { ctx: CoroutineContext ->
ctx to fetchTvShowResult(ctx, query)
}
}
val parseSuspend: (String) -> SuspendableState<
CoroutineContext, Result<List<ScoredShow>>> = { json ->
SuspendableState { ctx: CoroutineContext ->
ctx to parseTvShowResult(ctx, json)
}
}
Composing SuspendableState<CoroutineContext, Result<T>>
To implement composition now is simpler than it seems. Open SuspendableStateResult.kt, and add the following code:
typealias SuspendStateResultTransformer<S, T> =
suspend (S) -> Pair<S, Result<T>> // 1
data class SuspendableStateResult<S, T>( // 2
val sst: SuspendStateResultTransformer<S, T>
) {
companion object {
@JvmStatic
fun <S, T> lift( // 3
value: T
): SuspendableStateResult<S, T> =
SuspendableStateResult { state ->
state to Result.success(value)
}
}
}
fun <S, A, B> SuspendableStateResult<S, A>.map( // 4
fn: SuspendFun<A, B>
): SuspendableStateResult<S, B> =
SuspendableStateResult { s0: S ->
val (s1, a) = this.sst(s0)
s1 to a.fold(
onSuccess = { Result.success(fn(it)) },
onFailure = { Result.failure(it) }
)
}
fun <S, A, B> SuspendableStateResult<S, A>.flatMap( // 5
fn: suspend (A) -> SuspendableStateResult<S, B>
): SuspendableStateResult<S, B> = SuspendableStateResult { s0 ->
val (s1, res) = sst(s0)
res.fold(onSuccess = { a: A ->
fn(a).sst(s1)
}, onFailure = { thowable ->
s1 to Result.failure(thowable)
})
}
Finally flatMap
It’s finally time to put everything together so you can access the TVmaze database. Open ShowSearchService.kt, and add the following code:
val fetchSuspendResult: (String) -> SuspendableStateResult<
CoroutineContext, String> = { query ->
SuspendableStateResult { ctx: CoroutineContext ->
ctx to fetchTvShowResult(ctx, query)
}
}
val parseSuspendResult: (String) -> SuspendableStateResult<
CoroutineContext, List<ScoredShow>> = { json ->
SuspendableStateResult { ctx: CoroutineContext ->
ctx to parseTvShowResult(ctx, json)
}
}
@OptIn(FlowPreview::class) // 1
suspend fun searchTvShow(ctx: CoroutineContext) = // 2
withContext(ctx) {
inputStringFlow("Search Your Show: ") // 3
.flatMapConcat { query -> // 4
fetchSuspendResult(query)
.flatMap(parseSuspendResult).sst(ctx) // 5
.second.fold(
onSuccess = { it.asFlow() }, // 6
onFailure = { emptyFlow() }) // 7
}
}
@OptIn(FlowPreview::class)
fun main() {
runBlocking { // 1
searchTvShow(Dispatchers.IO) // 2
.collect { // 3
println("Score: ${it.score} " +
"Name: ${it.show.name} " +
"Genres: ${it.show.genres}") // 4
println(it.show.summary)
println("--------------------------")
}
}
}
The SharedFlow<T> & StateFlow<T> data types
SharedFlow<T>
and StateFlow<T>
are two additional flavors the coroutines API provides for flows. In terms of data types and the functions they provide, you can think of SharedState<T>
and StateFlow<T>
as implementations of Flow<T>
with specific behavior when collected by multiple collectors.
Key points
- The
List<T>
data type allows you to store an ordered collection of elements of typeT
in an eager way. - All the elements of a
List<T>
, which is immutable, are present at the moment you create it. - The
List<T>
data type is a functor and monad because of the presence ofmap
andflatMap
. You can also make it an applicative functor by implementingap
. - The
Sequence<T>
data type allows you to generate a sequence of values of typeT
in a lazy way. - In a
Sequence<T>
,map
andflatMapConcat
are invoked when the values need to be collected and consumed. - A
Sequence<T>
can work as a functor, applicative functor and monad. - The
Flow<T>
data type is similar toSequence<T>
but in the context of a coroutine. - Suspendable functions are an idiomatic and powerful tool to handle side effects in Kotlin.
- A
Flow<T>
allows you to generate a sequence, or flow, of values of typeT
that can be generated from suspendable functions. - You can implement
curry
and composition for suspendable functions as you did for non-suspendable ones, just following the functional programming principles you learned in the previous chapters. - You can repeat for
SharedFlow<T>
andStateFlow<T>
the same process you followed for aFlow<T>
.
Where to go from here?
Congratulations! In this chapter, you had the opportunity to apply concepts you learned in the previous chapter in a concrete example that allowed you to fetch information about your favorite TV shows. You’ve learned how to create Sequence<T>
and how to use Flow<T>
in an environment of concurrency. Finally, you’ve empowered your functional thinking by implementing abstractions for composing suspendable functions, returning a Result<T>
monad. It’s been a lot of work and also a lot of fun!