RxJava Combining Operators

In this tutorial, you’ll use RxJava combining operators to merge, filter and transform your data into succinct and reusable streams. By Prashant Barahi.

Leave a rating/review
Download materials
Save for later
Share
You are currently viewing page 2 of 4 of this article. Click here to view the first page.

Using the merge Operator to Load Data ASAP

The merge operator subscribes to all the passed sources simultaneously and relays their emissions as soon as they’re available. Hence, it doesn’t guarantee the sequential ordering of the emissions, which makes it suitable for handling infinite sources.

Merge Operator

RxJava also provides an instance method, mergeWith(), to merge two sources.

Open SplashViewModel.kt. The class contains populateData(), and the splash screen loads using this method when the app starts:

fun populateData(places: Places, costs: List<Cost>) {
  val insertPlaceSource = database.getPlaceDao().bulkInsert(places)
      .delay(2, TimeUnit.SECONDS)
      .doOnComplete { Log.i(LOG_TAG, "Completed inserting places into database") }
  val insertCostSource = database.getCostDao().bulkInsert(costs)
      .delay(1, TimeUnit.SECONDS)
      .doOnComplete { Log.i(LOG_TAG, "Completed inserting costs into database") }

  insertPlaceSource.mergeWith(insertCostSource)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribeBy(onComplete = onCompleteHandler, onError = onErrorHandler)
      .addTo(disposable)
}

Both insertPlaceSource and insertCostSource bulk insert corresponding items into the database. Below their definitions, the two streams merge, and the resulting Completable invokes onCompleteHandler, signaling that both of them have completed. You can assess the Logcat to see how the merge operator behaves. Enter –> into the Logcat search box to filter out noise from other apps:

2020-08-21 21:35:24.368 I/-->: Completed inserting costs into database
2020-08-21 21:35:25.366 I/-->: Completed inserting places into database
2020-08-21 21:35:25.366 I/-->: completeHandler after: 2s

In PlaceListActivity, mergeWith() can be used to load the result into the RecyclerView adapter as soon as it’s available. Open PlaceListViewModel.kt and confirm that loadOnReceive() is using mergeWith() to do exactly that.

Build and run. In the menu, tap Load When Received.

Merge Operator Demo

You’ll see that Earth’s places load before Mars’ places. Why? Recall the merge operator’s behavior — unlike the zip operator that waits for all sources to emit, the merge operator relays the emissions as soon as they’re available.

It’s important to remember that ordering isn’t guaranteed when using the merge operator. To maintain the order, RxJava provides the concatenation operator. But before getting into that, you’ll learn about converting UI events into streams and another combining operator — startWith.

Using the startWith Operator to Emit an Item Immediately

The startWith operator returns an Observable that emits a specific item before it begins streaming items that are sent by the source.

startWith Operator

The Android SDK provides a callback mechanism to perform actions on UI events like button clicks, scroll changes, etc. Using this allows you to, for example, create an observable source that emits on every UI event callback using Observable.create(), as explained in the Reactive Programming with RxAndroid in Kotlin tutorial.

Open PlaceDetailActivity.kt. Observable.create() is used in conjuntion with extention methods to convert UI events to observable sources:

/**
* Converts the checked change event of [CheckBox] to streams
*/
private fun CheckBox.toObservable(): Observable<Boolean> {
    return Observable.create<Boolean> {
    setOnCheckedChangeListener { _, isChecked ->
        it.onNext(isChecked)
    }
    }.startWith(isChecked)
}

The extension method above returns an Observable which, when subscribed, starts receiving check change events in the form of emissions. Before that, it immediately emits the argument that’s passed to startWith(), which is the isChecked value of the checkbox.

Remember you need to subscribe to the Observable returned by the extension functions to start receiving the events; calling the extension function isn’t enough:

val checkboxSource = checkbox.toObservable()
checkboxSource.subscribe { 
    Log.d(LOG_TAG, "New Checkbox value: $it")    
}

You can subscribe to the stream using the code above.

Now that you’ve learned how to turn UI events into Observables, you’ll learn about the concatenation operator.

Using the concat Operator

The concat operator is similar to the merge operator, with one very important distinction: It fires emission of sources sequentially. It won’t move on to the next source until the current one calls onComplete().

Concat Operator

This behavior makes it unsuitable for handling infinite sources, as it’ll forever emit from the current source and keep the others waiting.

In loadDataInUI inside PlaceDetailActivity.kt, you can see that combineUsingConcat() uses concat() to combine the events of Checkbox and NumberPicker:

private fun combineUsingConcat(
  booleanObservable: Observable<Boolean>,
  integerObservable: Observable<Int>
): Disposable {
    return Observable.concat(booleanObservable, integerObservable)
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeBy(onNext = { input ->
          Log.i(LOG_TAG, input.toString())
        }).addTo(disposable)
}

These individual Observables are infinite in the sense that they never call onComplete(); they’re disposed only when the activity is destroyed. So, using concat() or concatWith() will only relay items from the fastest source while the other one is “starved”. If you assess the Logcat, you can see that only one of the two (whichever gets to call startWith the earliest) gets printed.

Despite these gotchas, concat is the go-to operator when ordering is crucial.

You can correct the current UI behavior using combineLatest, which you’ll learn about next.

Using the combineLatest Operator to Correct the UI Behavior

The combineLatest() operator immediately couples the latest emissions from every other source for every emission. This behavior makes it perfect for combining UI inputs.

combineLatest Operator

Open PlaceDetailActivity.kt, and in loadDataInUI(), remove combineUsingConcat() and replace it with a call to combineUsingCombineLatest(), as shown below:

val isTwoWayTravelObservable = twoWayTravelCheckbox.toObservable()
val totalPassengerObservable = numberOfPassengerPicker.toObservable()
// combineUsingConcat(isTwoWayTravelObservable, totalPassengerObservable)
combineUsingCombineLatest(this, isTwoWayTravelObservable, totalPassengerObservable)

combineUsingCombineLatest() uses combineLatest() to pair the latest value from NumberPicker with the latest isChecked value of Checkbox. Then it uses them to calculate the total price of the trip, as shown in the code below:

private fun combineUsingCombineLatest(
    data: PlaceDetail, booleanObservable: Observable<Boolean>,
    integerObservable: Observable<Int>
) {
  Observable.combineLatest<Boolean, Int, Pair<Boolean, Int>>(
      booleanObservable,
      integerObservable,
      BiFunction { t1, t2 ->
        return@BiFunction Pair(t1, t2)
      })
      .subscribeOn(AndroidSchedulers.mainThread())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribeBy(onNext = { input ->
        val passengerCount = input.second
        val isTwoWayTravel = input.first
        resultTextView.text =
            viewModel.calculateTravelCost(data.cost, passengerCount, isTwoWayTravel)
                .toString()
      }).addTo(disposable)
}

Build and run. Click on any of the places. Now the UI should work as expected.

CombineLatest Demo

Next, you’ll learn about mergeDelayError.

Using the mergeDelayError Operator

MockApiService.kt has a fetchFromExperimentalApi(), which returns an error as soon as it’s subscribed to. Using the merge operator to join fetchMarsPlaces(), fetchEarthPlaces() and fetchFromExperimentalApi() will immediately call onErrorHandler — with no time for the first two to emit. With mergeDelayError, you can allow an observer to receive all successfully emitted items without being interrupted by an error.

mergeDelayError Operator

To see the behavior of mergeDelayError, open PlaceListViewModel.kt and place the following code inside loadExperimental():

startLoading()
Single.mergeDelayError(
    service.fetchFromExperimentalApi(),
    service.fetchMarsPlaces(),
    service.fetchEarthPlaces()
).subscribeOn(Schedulers.io())
    .doOnSubscribe { recordStartTime() }
    .observeOn(AndroidSchedulers.mainThread(), true)
    .subscribeBy(
        onNext = onDataLoadHandler,
        onComplete = onCompleteHandler,
        onError = onErrorHandler
    )
    .addTo(disposable)

Build and run. Tap the “Experimental Features (UNSTABLE)” item from the menu.

mergeDelayError Operator

You’ll see a toast with an error message only after the two sources, fetchMarsPlaces() and fetchEarthPlaces(), have emitted. Despite occurring before both streams complete, the error is sent only after they do.

Next, you’ll learn about the switch operator.