Reactive Programming with RxAndroid in Kotlin: An Introduction

Learn about how Reactive programming is a whole new paradigm using RxJava and RxAndroid in Android with Kotlin. By Kyle Jablonski.

Leave a rating/review
Download materials
Save for later
Share
You are currently viewing page 2 of 4 of this article. Click here to view the first page.

How to Create an Observable

There are many libraries to help you create observables from almost any type of event. However, sometimes you just need to roll your own. Besides, it’s a great way to learn about the Observable pattern and reactive programming!

You’ll create an Observable using Observable.create(). Here is its signature:

Observable<T> create(ObservableOnSubscribe<T> source)

That’s nice and concise, but what does it mean? What is the “source?” To understand that signature, you need to know what an ObservableOnSubscribe is. It’s an interface, with this contract:

public interface ObservableOnSubscribe<T> {
  void subscribe(ObservableEmitter<T> e) throws Exception;
}

Like an episode of a J.J. Abrams show like “Lost” or “Westworld,” that answers some questions while inevitably asking more. So the “source” you need to create your Observable will need to expose subscribe(), which in turn requires whatever’s calling it to provide an “emitter” as a parameter. What, then, is an emitter?

RxJava’s Emitter interface is similar to the Observer one:

public interface Emitter<T> {
  void onNext(T value);
  void onError(Throwable error);
  void onComplete();
}

An ObservableEmitter, specifically, also provides a means to cancel the subscription.

To visualize this whole situation, think of a water faucet regulating the flow of water. The water pipes are like an Observable, willing to deliver a flow of water if you have a means of tapping into it. You construct a faucet that can turn on and off, which is like an ObservableEmitter, and connect it to the water pipes in Observable.create(). The outcome is a nice fancy faucet. And of course, the faucet is reactive, since once you close it, the stream of water – data – is no longer active. :]

An example will make the situation less abstract and more clear. It’s time to create your first observable!

Observe Button Clicks

Add the following code inside the CheeseActivity class:

// 1
private fun createButtonClickObservable(): Observable<String> {
  // 2
  return Observable.create { emitter ->
    // 3
    searchButton.setOnClickListener {
      // 4
      emitter.onNext(queryEditText.text.toString())
    }

    // 5
    emitter.setCancellable {
      // 6
      searchButton.setOnClickListener(null)
    }
  }
}

Your imports should look as follows after entering the above code:

import io.reactivex.Observable
import kotlinx.android.synthetic.main.activity_cheeses.*

You’ve imported the correct Observable class and you’re using the Kotlin Android Extensions to get references to view objects.

Here’s what’s going on in the code above:

  1. You declare a function that returns an observable that will emit strings.
  2. You create an observable with Observable.create(), and supply it with a new ObservableOnSubscribe.
  3. Set up an OnClickListener on searchButton.
  4. When the click event happens, call onNext on the emitter and pass it the current text value of queryEditText.
  5. Keeping references can cause memory leaks in Java or Kotlin. It’s a useful habit to remove listeners as soon as they are no longer needed. But what do you call when you are creating your own Observable? For that very reason, ObservableEmitter has setCancellable(). Override cancel(), and your implementation will be called when the Observable is disposed, such as when the Observable is completed or all Observers have unsubscribed from it.
  6. For OnClickListener, the code that removes the listener is setOnClickListener(null).

Now that you’ve defined your Observable, you need to set up the subscription to it. Before you do, you need to learn about one more interface, Consumer. It’s a simple way to accept values coming in from an emitter.

public interface Consumer<T> {
  void accept(T t) throws Exception;
}

This interface is handy when you want to set up a simple subscription to an Observable.

The Observable interface requires several versions of subscribe(), all with different parameters. For example, you could pass a full Observer if you like, but then you’d need to implement all the necessary methods.

If all you need out of your subscription is for the observer to respond to values sent to onNext(), you can use the version of subscribe() that takes in a single Consumer (the parameter is even named onNext, to make the connection clear).

You’ll do exactly that when you subscribe in your activity’s onStart(). Add the following code to CheeseActivity.kt:

override fun onStart() {
  super.onStart()
  // 1
  val searchTextObservable = createButtonClickObservable()

  searchTextObservable
      // 2
      .subscribe { query ->
        // 3
        showResult(cheeseSearchEngine.search(query))
      }
}

Here’s an explanation of each step:

  1. First, create an observable by calling the method you just wrote.
  2. Subscribe to the observable with subscribe(), and supply a simple Consumer.
  3. Finally, perform the search and show the results.

Build and run the app. Enter some letters and tap the Search button. After a simulated delay (see CheeseSearchEngine), you should see a list of cheeses that match your request:

Sounds yummy! :]

RxJava Threading Model

You’ve had your first taste of reactive programming. There is one problem though: the UI freezes up for a few seconds when the search button is tapped.

You might also notice the following line in Android Monitor:

> 08-24 14:36:34.554 3500-3500/com.raywenderlich.cheesefinder I/Choreographer: Skipped 119 frames!  The application may be doing too much work on its main thread.

This happens because search is executed on the main thread. If search were to perform a network request, Android will crash the app with a NetworkOnMainThreadException exception. It’s time to fix that.

One popular myth about RxJava is that it is multi-threaded by default, similar to AsyncTask. However, if not otherwise specified, RxJava does all the work in the same thread it was called from.

You can change this behavior with the subscribeOn and observeOn operators.

subscribeOn is supposed to be called only once in the chain of operators. If it’s not, the first call wins. subscribeOn specifies the thread on which the observable will be subscribed (i.e. created). If you use observables that emit events from an Android View, you need to make sure subscription is done on the Android UI thread.

On the other hand, it’s okay to call observeOn as many times as you want in the chain. observeOn specifies the thread on which the next operators in the chain will be executed. For example:

myObservable // observable will be subscribed on i/o thread
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .map { /* this will be called on main thread... */ }
      .doOnNext{ /* ...and everything below until next observeOn */ }
      .observeOn(Schedulers.io())
      .subscribe { /* this will be called on i/o thread */ }

The most useful schedulers are:

  • Schedulers.io(): Suitable for I/O-bound work such as network requests or disk operations.
  • Schedulers.computation(): Works best with computational tasks like event-loops and processing callbacks.
  • AndroidSchedulers.mainThread() executes the next operators on the UI thread.
Kyle Jablonski

Contributors

Kyle Jablonski

Author

Filip Babić

Tech Editor and Final Pass Editor

Tyler Bos

Editor

Over 300 content creators. Join our team.