Home Android & Kotlin Books Reactive Programming with Kotlin

2
Observables Written by Alex Sullivan & Scott Gardner

Now that you’re all setup with RxJava, it’s time to jump in and start building some observables!

In this chapter, you’re going to go over a few different examples of creating and subscribing to observables. Things are going to be pretty theoretical for now, but rest assured that the skills you pick up in this chapter will come in very handy as you start working through real-world projects.

Getting started

You’ll work through these theoretical examples of observables using a normal IntelliJ IDEA project. You’ll move on to Android Studio projects once you switch to working on real-world Android applications.

Use the File ▸ Open command in IntelliJ IDEA to open the root folder of the starter project. Accept the defaults in any pop-ups that occur, and the project will then be opened. You’ll primarily be working in the main.kt file in the src/main/kotlin folder of the project. For now, there’s just an empty main() function. You’ll fill it out as you progress through the chapter.

Before you start diving into some RxJava code, take a look at the SupportCode.kt file. It contains the following helper function exampleOf(description: String, action: () -> Unit):

fun exampleOf(description: String, action: () -> Unit) {
  println("\n--- Example of: $description ---")
  action()
}

You’ll use this function to encapsulate different examples as you work your way through this chapter. You’ll see how to use this function shortly.

But, before you get too deep into that, now would probably be a good time to answer the question: What is an observable?

Observables are the heart of Rx. You’re going to spend some time discussing what observables are, how to create them and how to use them.

What is an observable?

You’ll see “observable,” “observable sequence,” and “stream” used interchangeably in Rx. And, really, they’re all the same thing. In RxJava, everything is a sequence…

…or something that works with a sequence. And an Observable is just a sequence with special powers. One of them, in fact the most important one, is that it is asynchronous. Observables produce events, the process of which the library refers to as emitting, over a period of time. Events can contain values, such as numbers or instances of a custom type, or they can be recognized user gestures, such as taps.

One of the best ways to conceptualize this is by using marble diagrams, which are values plotted on a timeline.

The left-to-right arrow represents time, and the numbered circles represent elements of a sequence. The observable will emit element 1, some time will pass, and then it will emit 2 and 3. How much time, you ask? It could be at any point throughout the life of the observable — which brings you to the lifecycle of an observable.

Lifecycle of an observable

In the previous marble diagram, the observable emitted three elements. When an observable emits an element, it does so in what’s known as a next event.

Here’s another marble diagram, this time including a vertical bar that represents the end of the road for this observable.

This observable emits three tap events, and then it ends. This is called a complete event, as the sequence has now terminated. For example, perhaps the taps were on a view that had been dismissed. The important thing is that the observable has terminated, and it can no longer emit anything. This is normal termination.

However, sometimes things can go wrong.

An error has occurred in this marble diagram; it’s represented by the red X. The observable emitted an error event containing the error. This is no different than when an observable terminates normally with a complete event. If an observable emits an error event, it is also terminated and can no longer emit anything else.

Here’s a quick recap:

  • An observable emits next events that contain elements. It can continue to do this until it either:
  • …emits a complete event, which terminates it.
  • …emits an error event, which terminates it.
  • Once an observable is terminated, it can no longer emit events.

Now that you understand what an observable is and what it does, you’ll create some observables to see them in action.

Creating observables

Switch back from the current file to main.kt and add the code below to the main() function. You’ll also need to include the import io.reactivex.rxjava3.core.Observable:

exampleOf("just") {  
  val observable: Observable<Int> = Observable.just(1)
}

In the code above, you used the just static method to create an observable with just one item: the Integer 1.

In Rx, methods that operate on observables are referred to as operators — so you just utilized the just operator.

just is aptly named, since all it does is create an observable sequence containing just the provided elements. just can take more than one item as well — try updating the previous line to take in a few more items:

val observable = Observable.just(1,2,3)

This time, you didn’t explicitly specify the type. You might think that because you gave it several integers, the type is Observable<List<Int>>. However, if you hover over the Observable.just(1,2,3) expression and click View ▸ Expression Type you’ll see that the type is actually Observable<Int>.

just has ten overloaded methods that take a variable number of arguments, each of which are eventually emitted by the observable. If you want to create an observable of type Observable<List<Int>>, then you can pass a List<Int> into the just operator. Replace the observable you previously defined with the following:

val observable = Observable.just(listOf(1))

Now, hover over the Observable.just(listOf(1)) expression and click View ▸ Expression Type again. You’ll see that the type is now Observable<List<Int>>. That means that this new observable will emit one item — and that single item will be a list of Int values. It can be a little tough to wrap your mind around an observable that emits lists, but with time it will become second nature.

Another operator you can use to create observables is fromIterable. Add this code to the bottom of the main() function:

exampleOf("fromIterable") {
  val observable: Observable<Int> =
    Observable.fromIterable(listOf(1, 2, 3))
}

The fromIterable operator creates an observable of individual objects from a regular list of elements. That is, it takes all of the items in the provided list and emits those elements as if you had instead written Observable.just(1, 2, 3).

Hover over the Observable.fromIterable(listOf(1, 2, 3)) expression and click View ▸ Expression Type again. You’ll see that the type of this observable is Observable<Int> rather than Observable<List<Int>>.

fromIterable can be handy if you have a list of objects you want to convert into an observable sequence.

The IntelliJ IDEA console is probably looking pretty bare at the moment if you’ve run this code. That’s because you haven’t printed anything except the example header. Time to change that by subscribing to observables.

Subscribing to observables

As an Android developer, you may be familiar with LocalBroadcastManager; it broadcasts notifications to observers, which are different than RxJava Observables. Here’s an example of of a broadcast receiver that listens for a custom-event Intent:

LocalBroadcastManager.getInstance(this)
    .registerReceiver(object : BroadcastReceiver() {
  override fun onReceive(context: Context?, intent: Intent?) {
    println("We got an intent!")
  }
}, IntentFilter("custom-event"))

Subscribing to an RxJava observable is similar; you call observing an observable subscribing to it. So instead of registerReceiver(), you use subscribe(). Unlike LocalBroadcastManager, where developers typically use only the getInstance() singleton instance, each observable in Rx is different.

More importantly, an observable won’t send events until it has a subscriber. Remember that an observable is really a sequence definition; subscribing to an observable is more like calling next() on an Iterator in the Kotlin Standard Library:

val sequence = 0 until 3
val iterator = sequence.iterator()
while (iterator.hasNext()) {
  println(iterator.next())
}

/* Prints:
0
1
2
*/

Subscribing to observables is more streamlined than this, though. You can also add handlers for each event type an observable can emit. Recall that an observable emits next, error, and complete events. A next event passes the emitted element to the handler, and an error event contains a throwable instance.

To see this in action, add this new example to the IntelliJ project (insert the code somewhere after the closing curly bracket of the previous example):

exampleOf("subscribe") {
  val observable = Observable.just(1, 2, 3)
}

This is similar to the previous example, except, this time, you’re simply using the just operator. Now add this code at the bottom of this example’s lambda, to subscribe to the observable:

observable.subscribe { println(it) }

Cmd-click on the subscribe operator, and you’ll see that it takes a Consumer of type Int as a parameter. Consumer is a simple interface that has one method, accept(), which takes a value and returns nothing. You’ll also see that subscribe returns a Disposable. You’ll cover disposables shortly.

Run your main() function. The result of this subscription is that each event emitted by the observable prints out:

--- Example of: subscribe ---
1
2
3

Note: The console should automatically appear whenever you run the project, but you can manually show it by clicking the Run tab in the bottom left of the IntelliJ IDEA window after you run the main() function. You can also select View ▸ Tool Windows ▸ Run. This is where the println statements display their output.

You’ve seen how to create observables of one element and of many elements. But what about an observable of zero elements? The empty operator creates an empty observable sequence with zero elements; it will only emit a complete event.

Add this new example to the project:

exampleOf("empty") {
  val observable = Observable.empty<Unit>()
}

An observable must be defined as a specific type if it can’t be inferred. So, since empty has nothing from which to infer the type, the type must be defined explicitly. In this case, Unit is as good as anything else. Add this code to the example to subscribe to it, importing io.reactivex.rxjava3.kotlin.subscribeBy to resolve the compile errors:

observable.subscribeBy(
  // 1
  onNext = { println(it) },
  // 2
  onComplete = { println("Completed") }
)

You’re using a new subscribeBy method here instead of the subscribe method you used previously. subscribeBy is a handy extension method defined in the RxKotlin library, which we’ll touch on later in the book. Unlike the subscribe method you used previously, subscribeBy lets you explicitly state what event you want to handle — onNext, onComplete, or onError. If you were to only supply the onNext field of subscribeBy, you’d be recreating the subscribe functionality you used above.

Taking each numbered comment in turn:

  1. Explicitly handle the next event by printing the carried value, just like before.
  2. A complete event doesn’t carry any value, so just print “Completed” instead.

Run this new example. In the console, you’ll see that empty only emits the completed event which makes the code print “Completed”:

--- Example of: empty ---
Completed

But what use is an empty observable? Well, they’re handy when you want to return an observable that immediately terminates or intentionally has zero values. As opposed to the empty operator, the never operator creates an observable that doesn’t emit anything and never terminates. It can be used to represent an infinite duration. Add this example to the project:

exampleOf("never") {
  val observable = Observable.never<Any>()

  observable.subscribeBy(
      onNext = { println(it) },
      onComplete = { println("Completed") }
  )
}

Nothing is printed, except for the example header. Not even “Completed”. How do you know if this is even working? Hang on to that inquisitive spirit until the Challenges section of this chapter.

So far, you’ve been working mostly with observables of explicit variables, but it’s also possible to generate an observable from a range of values.

Add this example to the project:

exampleOf("range") {
  // 1
  val observable: Observable<Int> = Observable.range(1, 10)

  observable.subscribe {
    // 2
    val n = it.toDouble()
    val fibonacci = ((Math.pow(1.61803, n) -
            Math.pow(0.61803, n)) /2.23606).roundToInt()
    println(fibonacci)
  }
}

Taking it section by section:

  1. Create an observable using the range operator, which takes a start integer value and a count of sequential integers to generate.
  2. Calculate and print the nth Fibonacci number for each emitted element.

Note: The Fibonacci sequence is generated by adding each of the previous two numbers in the sequence, starting with 0 and 1: 0, 1, 1, 2, 3, 5, 8, …

There’s actually a better place than in the subscribe method, to put code that transforms the emitted element. You’ll learn about that in Chapter 7, “Transforming Operators.”

Except for the never() example, up to this point, you’ve been working with observables that automatically emit a completed event and naturally terminate. This permitted you to focus on the mechanics of creating and subscribing to observables, but that swept an important aspect of subscribing to observables under the rug.

It’s time to do some housekeeping and deal with that aspect before moving on.

Disposing and terminating

Remember that an observable doesn’t do anything until it receives a subscription. It’s the subscription that triggers an observable to begin emitting events, up until it emits an error or completed event and is terminated. You can manually cause an observable to terminate by canceling a subscription to it.

Add this new example to the project:

exampleOf("dispose") {
  // 1
  val mostPopular: Observable<String> =
          Observable.just("A", "B", "C")
  // 2
  val subscription = mostPopular.subscribe {
    // 3
    println(it)
  }
}

Quite simply:

  1. Create an observable of strings.
  2. Subscribe to the observable, this time saving the returned Disposable as a local constant called subscription.
  3. Print each emitted event in the handler.

To explicitly cancel a subscription, call dispose() on it. After you cancel the subscription, or dispose of it, the observable in the current example will stop emitting events.

Add this code to the bottom of the example:

subscription.dispose()

Managing each subscription individually would be tedious, so RxJava includes a CompositeDisposable type. A CompositeDisposable holds disposables — typically added using the add() method — and will call dispose() on all of them when you call dispose() on the CompositeDisposable itself. Add this new example to the project. You’ll need to import io.reactivex.rxjava3.disposables.CompositeDisposable:

exampleOf("CompositeDisposable") {
  // 1
  val subscriptions = CompositeDisposable()
  // 2
  val disposable = Observable.just("A", "B", "C")
      .subscribe {
        // 3
        println(it)
      }
  // 4
  subscriptions.add(disposable)
  // 5
  subscriptions.dispose()
}

Here’s how this disposable code works:

  1. Create a CompositeDisposable.
  2. Create an observable and disposable.
  3. Subscribe to the observable and print out the emitted item.
  4. Add the Disposable return value from subscribe to the subscriptions CompositeDisposable.
  5. Dispose of the disposables.

This is the pattern you’ll use most frequently: creating and subscribing to an observable and immediately adding the subscription to a CompositeDisposable.

Why bother with disposables at all? If you forget to call dispose() on a Disposable when you’re done with the subscription, or in some other way cause the observable to terminate at some point, you will probably leak memory.

If you forget to utilize the Disposable returned by calling subscribe on an Observable, Android Studio will make it very clear that something is not right in an Android project!

Imagine leaking an huge view hierarchy just because you forgot to unsubscribe from a long running observable that you don’t even need anymore!

The create operator

In the previous examples, you’ve created observables with specific next event elements. Another way to specify all events that an observable will emit to subscribers is by using the create operator.

Add this new example to the project:

exampleOf("create") {

  val disposables = CompositeDisposable()

  Observable.create<String> { emitter ->

  }
}

The create operator takes a single parameter named source. Its job is to provide the implementation of calling subscribe on the observable. In other words, it defines all the events that will be emitted to subscribers. Command-click on create to see it’s definition:

The source parameter is an ObservableOnSubscribe<T>. ObservableOnSubscribe is a SAM (Single Abstract Method) interface that exposes one method — subscribe. That subscribe method takes in an Emitter<T>, which has a few methods that you’ll use to build up the actual Observable. Specifically, it has onNext, onComplete, and onError methods that you can invoke.

Change the implementation of create to the following:

Observable.create<String> { emitter ->
    // 1
    emitter.onNext("1")

    // 2
    emitter.onComplete()

    // 3
    emitter.onNext("?")
}

Here’s the play by play:

  1. Emit the string 1 via the onNext method.
  2. Emit a completed event.
  3. Emit another string ? via the onNext method again.

Do you think the second onNext element (?) could ever be emitted to subscribers? Why or why not?

To see if you guessed correctly, subscribe to the observable by adding the following code on the next line after the create implementation:

.subscribeBy(
    onNext = { println(it) },
    onComplete = { println("Completed") },
    onError = { println(it) }
)

You’ve subscribed to the observable, now run the code. The result is that the first next event element and “Completed” print out. The second next event doesn’t print because the observable emitted a completed event and terminated before it.

 --- Example of: create ---
1
Completed

Add the following line of code between the emitter.onNext and emitter.onComplete calls:

 emitter.onError(RuntimeException("Error"))

Run the code after you’ve made those changes. The observable emits the error and then is terminated.

--- Example of: create ---
1
Error

What would happen if you emitted neither a completed nor an error event? Comment out the onComplete and onError lines of code to find out. Here’s the complete implementation:

exampleOf("create") {
  Observable.create<String> { emitter ->
    // 1
    emitter.onNext("1")

//    emitter.onError(RuntimeException("Error"))
    // 2
//    emitter.onComplete()

    // 3
    emitter.onNext("?")
  }.subscribeBy(
      onNext = { println(it) },
      onComplete = { println("Completed") },
      onError = { println("Error") }
  )
}

Run those changes. Congratulations, you’ve just leaked memory! :] The observable will never finish, and since you never disposed of the Disposable returned by Observable.create the sequence will never be canceled.

 --- Example of: create ---
1
?

Feel free to uncomment the line adding the complete event or dispose of the returned Disposable if you can’t stand leaving the code in a leaky state.

Creating observable factories

Rather than creating an observable that waits around for subscribers, it’s possible to create observable factories that vend a new observable to each subscriber.

Add this new example to the project:

exampleOf("defer") {

  val disposables = CompositeDisposable()
  // 1
  var flip = false
  // 2
  val factory: Observable<Int> = Observable.defer {
    // 3
    flip = !flip
    // 4
    if (flip) {
      Observable.just(1, 2, 3)
    } else {
      Observable.just(4, 5, 6)
    }
  }
}

Here’s the explanation:

  1. Create a Boolean flag to flip which observable to return.
  2. Create an observable of Int factory using the defer operator.
  3. Invert flip, which will be used each time factory is subscribed to.
  4. Return different observables based on whether flip is true or false.

Externally, an observable factory is indistinguishable from a regular observable. Add this code to the bottom of the example to subscribe to factory four times:

for (i in 0..3) {
  disposables.add(
      factory.subscribe {
        println(it)
      }
  )
}

disposables.dispose()

Run this code. Each time you subscribe to factory, you get the opposite observable. You get 123, then 456, and the pattern repeats each time a new subscription is created:

 --- Example of: defer ---
1
2
3
4
5
6
1
2
3
4
5
6

Using other observable types

In addition to the normal Observable type, there are a few other types of observables with a narrower set of behaviors than regular observables. Their use is optional; you can use a regular observable anywhere you might use one of these specialized observables. Their purpose is to provide a way to more clearly convey your intent to readers of your code or consumers of your API. The context implied by using them can help make your code more intuitive.

There are three special types of observables in RxJava: Single, Maybe and Completable. Without knowing anything more about them yet, can you guess how each one is specialized?

  • Singles will emit either a success(value) or error event. success(value) is actually a combination of the next and completed events. This is useful for one-time processes that will either succeed and yield a value or fail, such as downloading data or loading it from disk.

  • A Completable will only emit a completed or error event. It doesn’t emit any value. You could use a Completable when you only care that an operation completed successfully or failed, such as a file write.

  • And Maybe is a mash-up of a Single and Completable. It can either emit a success(value), completed, or error. If you need to implement an operation that could either succeed or fail, and optionally return a value on success, then Maybe is your ticket.

You’ll have an opportunity to work more with these special observable types in Chapter 4, “Observables and Subjects in Practice,” and beyond. For now, you’ll run through a basic example of using a Single to load some text from a text file named Copyright.txt, because who doesn’t love some legalese once in a while? This file is in the src folder of the project.

Add this example to main(), importing io.reactivex.rxjava3.core.Single when you do:

exampleOf("Single") {
  // 1
  val subscriptions = CompositeDisposable()
  // 2
  fun loadText(filename: String): Single<String> {
    // 3
    return Single.create create@{ emitter ->

    }
  }
}

Here’s what you do in this code:

  1. Create a composite disposable to use later.
  2. Implement a function to load text from a file on disk that returns a Single.
  3. Create and return a Single.

Add this code inside the create lambda to complete the implementation:

// 1
val file = File(filename)
// 2
if (!file.exists()) {
  emitter.onError(FileNotFoundException("Can’t find $filename"))
  return@create
}
// 3
val contents = file.readText(Charsets.UTF_8)
// 4
emitter.onSuccess(contents)

From the top:

  1. Create a new File from the filename.
  2. If the file doesn’t exist, emit a FileNotFoundException via the onError method and return from the create method.
  3. Get the data from the file.
  4. Emit the contents of the file.

Now you can put this function to work. Add this code to the example:

// 1
val observer = loadText("Copyright.txt")
    // 2
    .subscribeBy(
        // 3
        onSuccess = { println(it) },
        onError = { println("Error, $it") }
    )

subscriptions.add(observer)

Here, you:

  1. Call loadText(), passing the root name of the text file.
  2. Subscribe to the Single it returns.
  3. Pass onSuccess and onError lambdas to the subscribeBy method, either printing the contents of the file or printing the error.

Run the example, and you should see the text from the file printed to the console, the same as the copyright comment at the top of the project:

 --- Example of: Single ---
Copyright (c) 2014-2020 Razeware LLC
...

Try changing the filename to something else, and you should get the file not found exception printed instead.

Challenges

Practice makes permanent. By completing challenges in this book, you’ll practice what you’ve learned in each chapter and pick up a few more tidbits of knowledge about working with observables. A starter project as well as a finished version are provided for each challenge. Enjoy!

Challenge: Perform side effects

In the never operator example earlier, nothing printed out. That was before you were adding your subscriptions to composite disposables, but if you had added it to one, you could’ve used a handy operator to print a message when the disposable was disposed.

Operators that begin with doOn, such as the doOnDispose operator, allows you to insert side effects; that is, you add handlers that take some action but that won’t affect the observable. For doOnDispose, that is whenever the disposable is disposed of.

There’s a few other handy doOn methods that you can use. There’s a doOnNext method, a doOnComplete method, a doOnError method and a doOnSubscribe method that you can also use to perform some side effect at the right moment.

To complete this challenge, insert the doOnSubscribe operator in the never example. Feel free to include any of the other handlers if you’d like; they work just like doOnSubscribe’s handler does.

And while you’re at it, create a composite disposable and add the subscription to it.

Don’t forget you can always peek into the finished challenge project for “inspiration.”

Key points

  • Everything is a sequence in RxJava, and the primary sequence type is Observable.
  • Observables start emitting when they are subscribed to.
  • You must dispose of subscriptions when done with them, and you’ll often use a CompositeDisposable to do so.
  • Single, Completable and Maybe are specialized observable types that are handy in certain situations.

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.

Have feedback to share about the online reading experience? If you have feedback about the UI, UX, highlighting, or other features of our online readers, you can send them to the design team with the form below:

© 2021 Razeware LLC