RxJava Combining Operators

In this tutorial, you’ll use RxJava combining operators to merge, filter and transform your data into succinct and reusable streams. By Prashant Barahi.

Leave a rating/review
Download materials
Save for later
Share

RxJava is a ReactiveX port for Java and a programming paradigm that provides a way to put static items into motion. It comes packed with many intuitive APIs to merge, filter, transform or otherwise alter the stream of data.

But what does “putting static items into motion” mean? Think of it this way — as you’re reading this tutorial, your brain works in an RxJava-like fashion. It converts static text into a stream of words, and then it transforms the words into your inner voice. Some information is even discarded, as though your mind is filtering it out. And if you’re listening to music while reading, your brain is combining the streams from both sources, much like RxJava’s combining operators.

You’ll see several of these nifty features explained in this tutorial. You’ll also:

  • Understand RxJava combining operators and their behavior
  • Use combining operators to consolidate multiple streams
  • Convert UI events into streams and then combine them

You’ll implement an app called TouRx. With TouRx, you can find your next vacation, calculate the travel cost and even configure the UI if you’re super picky about how you want your vacations shown to you. :]

TouRx App Tour

So, pack your bags and get ready!

To start learning RxJava, read this Reactive Programming with RxAndroid in Kotlin tutorial.

Note: This tutorial assumes you’re familiar with the basics of Android development in Kotlin with RxJava. If you’re new to Kotlin and Android, check out Kotlin For Android: An Introduction and this Beginning Android Development with Kotlin tutorial.

Getting Started

Download the starter project by clicking the Download Materials button at the top or bottom of the tutorial. Launch Android Studio 4.0 or later and select Open an existing Android Studio project. Then navigate to and open the starter project’s folder.

Build and run the app. You’ll see the following screen:

TouRx Main Screen

You’ll see a list of places from Earth and Mars. Tapping on any of them will show the trip details and cost. In the top-right corner, there’s a menu that lets you configure the app behavior. Most of these functions don’t do anything in the starter project yet, as their corresponding PlaceListViewModel functions are blank. You’ll fill each up as you progress through this tutorial.

The important files to note are:

  • BaseViewModel.kt, which contains an abstract ViewModel class that provides RxJava-compatible handlers like onErrorHandler, onDataLoadHandler, onSuccessHandler and onCompleteHandler. These update the UI state and use LiveData for their reactive interface. These handlers must operate on the main thread.
  • BaseActivity.kt, which contains the base activity class that notifies children classes whenever there’s a change in the state.
  • ApiService.kt, which provides the blueprint to handle IO operations. MockApiService is an implementation that simulates “expensive” IO operations using a Room database while also adding some delay in every call. It’s important to note that fetchMarsPlaces() is slower than fetchEarthPlaces().
  • State.kt represents the states of the IO operations.

The app needs to perform multiple IO operations to display the data. You’ll use RxJava’s combining operators in the upcoming sections to accomplish this task. But first, you’ll learn more about what they are.

What Are RxJava Combining Operators?

In the Reactive Programming with RxAndroid in Kotlin: An Introduction tutorial, you learned about RxJava’s Observable and Flowable and how to operate on them. With combining operators, you can combine and consolidate them.

Some of the combining operators provided by RxJava are:

  • merge
  • concat
  • combineLatest
  • zip
  • amb
  • startWith
  • mergeDelayError
  • switchOnNext
  • join
  • groupJoin
Note: RxJava also provides a corresponding instance method (typically ending with _With(), like mergeWith(), concatWith(), zipWith and so on) for most of the static method operators listed above. RxKotlin provides a Kotlin-friendly way of using RxJava by delegating the calls to the corresponding RxJava method.

You’ll learn about the behavior of these operators in the upcoming sections, starting with the ambiguous operator.

Using the amb Operator to Load the Fastest Source

The Ambiguous operator, better known as amb, makes its sources “compete” against each other. The fastest source to emit a result gets relayed down the chain, while all other sources are disposed. RxJava also provides ambWith() to perform the ambiguous operation using only two sources.

Ambigious Operator

On the listing screen, the fastest source loads using ambWith(). Open PlaceListViewModel.kt and place the following code inside loadTheQuickestOne():

startLoading()
val earthSource = service.fetchEarthPlaces()
val marsSource = service.fetchMarsPlaces()
    .doOnDispose { Log.d(LOG_TAG, "Disposing mars sources") }

earthSource.ambWith(marsSource)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe { recordStartTime() }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeBy(onSuccess = onSuccessHandler, onError = onErrorHandler)
    .addTo(disposable)

In the code above, earthSource responds (or emits) faster than marsSource. The streams are being combined with ambWith, so the result from earthSource gets displayed while marsSource gets disposed.

Build and run. Tap Load The Quickest One from the menu.

Ambigious Operator Demo

When you have many streams, the static method amb() can take in a collection of Observable, allowing you to specify a large number of sources and only allowing the quickest one to move downstream.

In the next section, you’ll learn about the zip operator and how it can simultaneously accommodate the content of multiple streams.

Using the zip Operator to Load All Sources At Once

The zip operator and its instance method flavor, zipWith(), allow you to take an emission from each Observable source and combine them into a single emission. You must pass in a lambda, which defines how they combine, as the last argument.

Zip Operator

RxKotlin provides a simpler zipWith() extension function that combines emission from two sources into a Pair — so you don’t have to supply the lambda as the last argument when using this.

Open PlaceDetailViewModel.kt. You can see that loadPlaceDetails() uses zipWith from RxKotlin to combine two sources into a Pair:

fun loadPlaceDetails(id: Int) {
  startLoading()
  val costSource = service.fetchCostById(id)
  val placeSource = service.fetchPlaceById(id)
  
  costSource.zipWith(placeSource)
      .subscribeOn(Schedulers.io())
      .doOnSubscribe { recordStartTime() }
      .observeOn(AndroidSchedulers.mainThread())
      .map {
        return@map PlaceDetail(cost = it.first, place = it.second)
      }
      .subscribeBy(onSuccess = onSuccessHandler, onError = onErrorHandler)
      .addTo(disposable)
}    

Launch the app and click on any of the places to view its details. costSource and placeSource are IO operations with varying delays, however, the data in PlaceDetailActivity reacts as if they load at the same time. This is because zipWith waits for the emission from both sources to couple them into a Pair and relay them down the chain.

Note: zipWith is an extension from RxKotlin and is different than RxJava’s zipWith in terms of the input argument. RxKotlin’s version takes in one argument only and relays the Pair of the emission down the chain. So, the import statement must look like import io.reactivex.rxkotlin.zipWith.

Zipping multiple Observables is useful when you need results from multiple sources simultaneously. Open PlaceListViewModel.kt and put the following code inside loadAllAtOnce():

startLoading()
service.fetchEarthPlaces()
    .zipWith(service.fetchMarsPlaces())
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnSubscribe { recordStartTime() }
    .map {
      return@map listOf(*it.first.toTypedArray(), *it.second.toTypedArray())
    }
    .subscribeBy(onSuccess = onSuccessHandler, onError = onErrorHandler)
    .addTo(disposable)

Build and run, and tap All At Once in the menu.

Zip Operator Demo

You’ll see the data from Mars and Earth sources loads simultaneously, despite the varying delays.

With zip, an emission from one source waits to get paired with the emission of other sources. So if one of the sources calls onComplete(), the emissions from others will drop. This behavior implies that faster sources have to wait on slower sources to provide emissions to couple with.

Now that you know how to use the zip operator, you’ll learn about the merge operator.