Reactive Programming with RxAndroid in Kotlin: An Introduction

Irina Galata

Update note: This tutorial has been updated to Kotlin, Android 26 (Oreo), and Android Studio 3.0 Beta 5 by Irina Galata. The original tutorial was written by Artem Kholodnyi.

AndroidReactive-featureThey say you should develop a proactive mindset in life, not a reactive one. That does not apply to Android programming, however! :]

Reactive programming is not just another API. It’s a whole new paradigm and a very useful one. RxJava is a reactive implementation used on Android. Android is a perfect place to start your exploration of the reactive world. It’s made even easier with RxAndroid, a library that wraps asynchronous UI events to be more RxJava like.

Don’t be scared — I’ll bet the basic concept of reactive programming is known to you even if you are not aware of it yet. :]

Note: This tutorial requires good knowledge of Android and Kotlin. To get up to speed, check out our Android Development Tutorials first and return to this tutorial when you’re ready.

In this RxAndroid tutorial you will learn how to do the following:

  • understand what Reactive Programming is
  • define an observable
  • turn asynchronous events like button clicks and text field context changes into observables
  • transform observable items
  • filter observable items
  • specify the thread on which code should be executed
  • combine several observables into one

I hope you like cheese — because you’re going to build a cheese-finding app as you learn the concepts above! :]

Getting Started

Download the starter project for this tutorial and open it in Android Studio 3.0 Beta 5 or above.

You’ll be working exclusively in CheeseActivity.kt. The CheeseActivity class extends BaseSearchActivity; take some time to explore BaseSearchActivity and check out the following features ready for your use:

  • showProgress(): A function to show a progress bar…
  • hideProgress(): … and a function to hide it.
  • showResult(result: List): A function to display a list of cheeses.
  • cheeseSearchEngine: A field which is an instance of CheeseSearchEngine. It has a search function which you call when you want to search for cheeses. It accepts a text search query and returns a list of matching cheeses.

Build and run the project on your Android device or emulator. You should see a gloriously empty search screen:

starter-300x500

What is Reactive Programming?

Before creating your first observable, indulge yourself with a bit of a theory first. :]

In imperative programming, an expression is evaluated once and a value is assigned to a variable:

var x = 2
var y = 3
var z = x * y // z is 6

x = 10
// z is still 6

On the other hand, reactive programming is all about responding to value changes.

You have probably done some reactive programming — even if you didn’t realize it at the time.

  • Defining cell values in spreadsheets is similar to defining variables in imperative programming.
  • Defining cell expressions in spreadsheets is similar to defining and operating on observables in reactive programming.

Take the following spreadsheet that implements the example from above:

The spreadsheet assigns cell B1 with a value of 2, cell B2 with a value of 3 and a third cell, B3, with an expression that multiplies the value of B1 by the value of B2. When the value of either of the the components referenced in the expression changes, the change is observed and the expression is re-evaluated automagically in B3:

Difference between RxJava and RxKotlin

As you probably know, it’s possible to use Java libraries in Kotlin projects thanks to Kotlin’s language compatibility with Java. If that’s the case, then why was RxKotlin created in the first place? RxKotlin is a Kotlin wrapper around RxJava, which also provides plenty of quite useful extension functions. Effectively, RxKotlin makes working with RxJava more Kotlin-y.

In this article, we’ll focus on using RxJava, since it’s critical to understand the core concepts of this approach, however everything you will learn applies to RxKotlin as well.

Note: Take a look at the build.gradle file and the project dependencies especially. Except for the UI libraries, it contains RxKotlin and RxAndroid packages. We don’t need to specify RxJava here explicitly since RxKotlin already contains it.

RxJava Observable Contract

RxJava make use of the Observer pattern.

Note: To refresh your memory about the Observer pattern you can visit Common Design Patterns for Android with Kotlin.

In the Observer pattern, you have objects that implement two key RxJava interfaces: Observable and Observer. When an Observable changes state, all Observer objects subscribed to it are notified.

Among the methods in the Observable interface is subscribe(), which an Observer will call to begin the subscription.

From that point, the Observer interface has three methods which the Observable calls as needed:

  • onNext(T value) provides a new item of type T to the Observer
  • onComplete() notifies the Observer that the Observable has finished sending items
  • onError(Throwable e) notifies the Observer that the Observable has experienced an error

As a rule, a well-behaved Observable emits zero or more items that could be followed by either completion or error.

That sounds complicated, but some marble diagrams may clear things up.

network-request

The circle represents an item that has been emitted from the observable and the black block represents a completion or error. Take, for example, a network request observable. The request usually emits a single item (response) and immediately completes.

A mouse movement observable would emit mouse coordinates but will never complete:

mouse-coords

Here you can see multiple items that have been emitted but no block showing the mouse has completed or raised an error.

No more items can be emitted after an observable has completed. Here’s an example of a misbehaving observable that violates the Observable contract:

misbehaving-stream

That’s a bad, bad observable because it violates the Observable contract by emitting an item after it signaled completion.

How to Create an Observable

There are many libraries to help you create observables from almost any type of event. However, sometimes you just need to roll your own. Besides, it’s a great way to learn!

You’ll create an Observable using Observable.create(). Here is its signature:

Observable<T> create(ObservableOnSubscribe<T> source)

That’s nice and concise, but what does it mean? What is the “source?” To understand that signature, you need to know what an ObservableOnSubscribe is. It’s an interface, with this contract:

public interface ObservableOnSubscribe<T> {
  void subscribe(ObservableEmitter<T> e) throws Exception;
}

Like an episode of a J.J. Abrams show like “Lost” or “Westworld,” that answers some questions while inevitably asking more. So the “source” you need to create your Observable will need to expose subscribe(), which in turn requires whatever’s calling it to provide an “emitter” as a parameter. What, then, is an emitter?

RxJava’s Emitter interface is similar to the Observer one:

public interface Emitter<T> {
  void onNext(T value);
  void onError(Throwable error);
  void onComplete();
}

An ObservableEmitter, specifically, also provides a means to cancel the subscription.

To visualize this whole situation, think of a water faucet regulating the flow of water. The water pipes are like an Observable, willing to deliver a flow of water if you have a means of tapping into it. You construct a faucet that can turn on and off, which is like an ObservableEmitter, and connect it to the water pipes in Observable.create(). The outcome is a nice fancy faucet. :]

An example will make the situation less abstract and more clear. It’s time to create your first observable! :]

Observe Button Clicks

Add the following code inside the CheeseActivity class:

// 1
private fun createButtonClickObservable(): Observable<String> {
  // 2
  return Observable.create { emitter ->
    // 3
    searchButton.setOnClickListener {
      // 4
      emitter.onNext(queryEditText.text.toString())
    }

    // 5
    emitter.setCancellable {
      // 6
      searchButton.setOnClickListener(null)
    }
  }
}

Your imports should look as follows after entering the above code:

import io.reactivex.Observable
import kotlinx.android.synthetic.main.activity_cheeses.*

You’ve imported the correct Observable class and you’re using the Kotlin Android Extensions to get references to view objects.

Here’s what’s going on in the code above:

  1. You declare a function that returns an observable that will emit strings.
  2. You create an observable with Observable.create(), and supply it with a new ObservableOnSubscribe.
  3. Set up an OnClickListener on searchButton.
  4. When the click event happens, call onNext on the emitter and pass it the current text value of queryEditText.
  5. Keeping references can cause memory leaks in Java or Kotlin. It’s a useful habit to remove listeners as soon as they are no longer needed. But what do you call when you are creating your own Observable? For that very reason, ObservableEmitter has setCancellable(). Override cancel(), and your implementation will be called when the Observable is disposed, such as when the Observable is completed or all Observers have unsubscribed from it.
  6. For OnClickListener, the code that removes the listener is setOnClickListener(null).

Now that you’ve defined your Observable, you need to set up the subscription to it. Before you do, you need to learn about one more interface, Consumer. It’s a simple way to accept values coming in from an emitter.

public interface Consumer<T> {
  void accept(T t) throws Exception;
}

This interface is handy when you want to set up a simple subscription to an Observable.

The Observable interface requires several versions of subscribe(), all with different parameters. For example, you could pass a full Observer if you like, but then you’d need to implement all the necessary methods.

If all you need out of your subscription is for the observer to respond to values sent to onNext(), you can use the version of subscribe() that takes in a single Consumer (the parameter is even named onNext, to make the connection clear).

You’ll do exactly that when you subscribe in your activity’s onStart(). Add the following code to CheeseActivity.kt:

override fun onStart() {
  super.onStart()
  // 1
  val searchTextObservable = createButtonClickObservable()

  searchTextObservable
      // 2
      .subscribe { query ->
        // 3
        showResult(cheeseSearchEngine.search(query))
      }
}

Here’s an explanation of each step:

  1. First, create an observable by calling the method you just wrote.
  2. Subscribe to the observable with subscribe(), and supply a simple Consumer.
  3. Finally, perform the search and show the results.

Build and run the app. Enter some letters and press the Search button. After a simulated delay (see CheeseSearchEngine), you should see a list of cheeses that match your request:

enter-and-press-300x500

Sounds yummy! :]

RxJava Threading Model

You’ve had your first taste of reactive programming. There is one problem though: the UI freezes up for a few seconds when the search button is pressed.

You might also notice the following line in Android Monitor:

> 08-24 14:36:34.554 3500-3500/com.raywenderlich.cheesefinder I/Choreographer: Skipped 119 frames!  The application may be doing too much work on its main thread.

This happens because search is executed on the main thread. If search were to perform a network request, Android will crash the app with a NetworkOnMainThreadException exception. It’s time to fix that.

One popular myth about RxJava is that it is multi-threaded by default, similar to AsyncTask. However, if not otherwise specified, RxJava does all the work in the same thread it was called from.

You can change this behavior with the subscribeOn and observeOn operators.

subscribeOn is supposed to be called only once in the chain of operators. If it’s not, the first call wins. subscribeOn specifies the thread on which the observable will be subscribed (i.e. created). If you use observables that emit events from an Android View, you need to make sure subscription is done on the Android UI thread.

On the other hand, it’s okay to call observeOn as many times as you want in the chain. observeOn specifies the thread on which the next operators in the chain will be executed. For example:

myObservable // observable will be subscribed on i/o thread
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .map { /* this will be called on main thread... */ }
      .doOnNext{ /* ...and everything below until next observeOn */ }
      .observeOn(Schedulers.io())
      .subscribe { /* this will be called on i/o thread */ }

The most useful schedulers are:

  • Schedulers.io(): Suitable for I/O-bound work such as network requests or disk operations.
  • Schedulers.computation(): Works best with computational tasks like event-loops and processing callbacks.
  • AndroidSchedulers.mainThread() executes the next operators on the UI thread.

The Map Operator

The map operator applies a function to each item emitted by an observable and returns another observable that emits results of those function calls. You’ll need this to fix the threading issue as well.

If you have an observable called numbers that emits the following:

map-0

And if you apply map as follows:

numbers.map { number -> number * number }

The result would be the following:

map-1

That’s a handy way to iterate over multiple items with little code. Let’s put it to use!

Modify onStart() in CheeseActivity class to look like the following:

override fun onStart() {
  super.onStart()

  val searchTextObservable = createButtonClickObservable()

  searchTextObservable
      // 1
      .subscribeOn(AndroidSchedulers.mainThread())
      // 2
      .observeOn(Schedulers.io())
      // 3
      .map { cheeseSearchEngine.search(it) }
      // 4
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe {
        showResult(it)
      }
}

Going over the code above:

  1. First, specify that code down the chain should start on the main thread instead of on the I/O thread. In Android, all code that works with Views should execute on the main thread.
  2. Specify that the next operator should be called on the I/O thread.
  3. For each search query, you return a list of results.
  4. Finally, make sure that the results are passed to the list on the main thread

Build and run your project. Now the UI should be responsive even when a search is in progress.

Show Progress Bar with doOnNext

It’s time to display the progress bar!

For that you’ll need a doOnNext operator. doOnNext takes a Consumer and allows you do something each time an item is emitted by observable.

In the same CheeseActivity class modify onStart() to the following:

override fun onStart() {
  super.onStart()

  val searchTextObservable = createButtonClickObservable()

  searchTextObservable
      // 1
      .observeOn(AndroidSchedulers.mainThread())
      // 2
      .doOnNext { showProgress() }
      .observeOn(Schedulers.io())
      .map { cheeseSearchEngine.search(it) }
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe {
        // 3
        hideProgress()
        showResult(it)
      }
}

Taking each numbered comment in turn:

  1. Ensure that the next operator in chain will be run on the main thread.
  2. Add the doOnNext operator so that showProgress() will be called every time a new item is emitted.
  3. Don’t forget to call hideProgress() when you are just about to display a result.

Build and run your project. You should see the progress bar appearing when you initiate the search:

progressbar

Observe Text Changes

What if you want to perform search automatically when the user types some text, just like Google?

First, you need to subscribe to TextView text changes. Add the following function to the CheeseActivity class:

// 1
private fun createTextChangeObservable(): Observable<String> {
  // 2
  val textChangeObservable = Observable.create<String> { emitter ->
    // 3
    val textWatcher = object : TextWatcher {

      override fun afterTextChanged(s: Editable?) = Unit

      override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) = Unit

      // 4
      override fun onTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {
        s?.toString()?.let { emitter.onNext(it) }
      }

    }

    // 5
    queryEditText.addTextChangedListener(textWatcher)

    // 6
    emitter.setCancellable {
      queryEditText.removeTextChangedListener(textWatcher)
    }
  }

  // 7
  return textChangeObservable
}

Here’s the play-by-play of each step above:

  1. Declare a function that will return an observable for text changes.
  2. Create textChangeObservable with create(), which takes an ObservableOnSubscribe.
  3. When an observer makes a subscription, the first thing to do is to create a TextWatcher.
  4. You aren’t interested in beforeTextChanged() and afterTextChanged(). When the user types and onTextChanged() triggers, you pass the new text value to an observer.
  5. Add the watcher to your TextView by calling addTextChangedListener().
  6. Don’t forget to remove your watcher. To do this, call emitter.setCancellable() and overwrite cancel() to call removeTextChangedListener()
  7. Finally, return the created observable.

To see this observable in action, replace the declaration of searchTextObservable in onStart() of CheeseActivity as follows:

val searchTextObservable = createTextChangeObservable()

Build and run your app. You should see the search kick off when you start typing text in the TextView:

text-view-changes-simple

Filter Queries by Length

It doesn’t make sense to search for queries as short as a single letter. To fix this, let’s introduce the powerful filter operator.

filter passes only those items which satisfy a particular condition. filter takes in a Predicate, which is an interface that defines the test that input of a given type needs to pass, with a boolean result. In this case, the Predicate takes a String and returns true if the string’s length is two or more characters.

Replace return textChangeObservable in createTextChangeObservable() with the following code:

return textChangeObservable.filter { it.length >= 2 }

Everything will work exactly the same, except that text queries with length less than 2 won’t get sent down the chain.

Run the app; you should see the search kick off only when you type the second character:

filter-0

filter-1

Debounce operator

You don’t want to send a new request to the server every time the query is changed by one symbol.

debounce is one of those operators that shows the real power of reactive paradigm. Much like the filter operator, debounce, filters items emitted by the observable. But the decision on whether the item should be filtered out is made not based on what the item is, but based on when the item was emitted.

debounce waits for a specified amount of time after each item emission for another item. If no item happens to be emitted during this wait, the last item is finally emitted:

719f0e58_1472502674

In createTextChangeObservable(), add the debounce operator just below the filter so that the return statement will look like the following code:

return textChangeObservable
      .filter { it.length >= 2 }
      .debounce(1000, TimeUnit.MILLISECONDS) // add this line

Run the app. You’ll notice that the search begins only when you stop making quick changes:

debounce-500px

debounce waits for 1000 milliseconds before emitting the latest query text.

Merge Operator

You started by creating an observable that reacted to button clicks and then implemented an observable that reacts to text field changes. But how do you react to both?

There are a lot of operators to combine observables. The most simple and useful one is merge.

merge takes items from two or more observables and puts them into a single observable:

ae08759b_1472502259

Change the beginning of onStart() to the following:

val buttonClickStream = createButtonClickObservable()
val textChangeStream = createTextChangeObservable()

val searchTextObservable = Observable.merge<String>(buttonClickStream, textChangeStream)

Run your app. Play with the text field and the search button; the search will kick off either when you finish typing two or more symbols or when you simply press the Search button.

RxJava and Activity/Fragment lifecycle

Remember those setCancellable methods you set up? They won’t fire until the observable is unsubscribed.

The Observable.subscribe() call returns a Disposable. Disposable is an interface that has two methods:

public interface Disposable {
  void dispose();  // ends a subscription
  boolean isDisposed(); // returns true if resource is disposed (unsubscribed)
}

Add the following property to CheeseActivity:

private lateinit var disposable: Disposable

In onStart(), set the returned value of subscribe() to disposable with the following code (only the first line changes):

disposable = searchTextObservable // change this line
      .observeOn(AndroidSchedulers.mainThread())
      .doOnNext { showProgress() }
      .observeOn(Schedulers.io())
      .map { cheeseSearchEngine.search(it) }
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe {
        hideProgress()
        showResult(it)
      }

Since you subscribed to the observable in onStart(), onStop() would be a perfect place to unsubscribe.

Add the following code to CheeseActivity.kt:

@Override
override fun onStop() {
  super.onStop()
  if (!disposable.isDisposed) {
    disposable.dispose()
  }
}

And that’s it! Build and run the app. You won’t “observe” any changes yourself, but now the app is successfully avoiding RxJava memory leaks. :]

Where to Go From Here?

You can download the final project from this tutorial here.

You’ve learned a lot in this tutorial. But that’s only a glimpse of the RxJava world. For example, there is RxBinding, a library that includes most of the Android View APIs. Using this library, you can create a click observable by just calling RxView.clicks(viewVariable).

To learn more about RxJava refer to the ReactiveX documentation.

What’s new in RxJava 2

The second version of RxJava is quite different from the first one, since RxJava 2 was completely rewritten. You should get acquainted with some entirely new classes like Flowable and Maybe.

With Flowable, you can now avoid getting such well-known problems as MissingBackpressureException or OutOfMemoryError. You can use it to handle a flow of 10k+ elements.

Maybe is a combination of Single and Completable. Like Single, it can receive at most one item or fail, and like Completable it can finish successfully without any received items or fail.

Find some time to skim all the changelog of RxJava2 :]

If you have any comments or questions, don’t hesitate to join the discussion below!

Team

Each tutorial at www.raywenderlich.com is created by a team of dedicated developers so that it meets our high quality standards. The team members who worked on this tutorial are:

Irina Galata

21 years old Android developer from Dnipro, Ukraine.
Mostly focused on complex animations and OpenGL ES especially.

You can find me on Medium and GitHub

Other Items of Interest

Big Book SaleAll raywenderlich.com iOS 11 books on sale for a limited time!

raywenderlich.com Weekly

Sign up to receive the latest tutorials from raywenderlich.com each week, and receive a free epic-length tutorial as a bonus!

Advertise with Us!

PragmaConf 2016 Come check out Alt U

Our Books

Our Team

Video Team

... 19 total!

iOS Team

... 73 total!

Android Team

... 21 total!

Unity Team

... 11 total!

Articles Team

... 15 total!

Resident Authors Team

... 18 total!

Podcast Team

... 7 total!

Recruitment Team

... 9 total!