RxJava Combining Operators

In this tutorial, you’ll use RxJava combining operators to merge, filter and transform your data into succinct and reusable streams. By Prashant Barahi.

Leave a rating/review
Download materials
Save for later
Share
You are currently viewing page 3 of 4 of this article. Click here to view the first page.

Assessing the Switch Operator

switchOnNext subscribes to an Observable that emits Observables. It unsubscribes from the previously emitted source when a new Observable is emitted from the source, and it starts emitting items from the new source instead. Any emissions from the previous Observable are dropped.

switchOnNext Operator

Open PlaceListViewModel.kt and put the below snippet inside switchOnNext():

disposeCurrentlyRunningStreams()

// 1
val outerSource = Observable.interval(3, TimeUnit.SECONDS)
    .doOnNext {
      Log.i(LOG_TAG, "Emitted by OuterSource: $it")
    }

// 2
val innerSource = Observable.interval(1, TimeUnit.SECONDS)
    .doOnSubscribe {
      Log.i(LOG_TAG, "Starting InnerSource")
    }

// 3
Observable.switchOnNext(
    outerSource.map { return@map innerSource }
)
    .doOnNext {
      Log.i(LOG_TAG, "Relayed items $it")
    }
    .subscribeOn(Schedulers.single())
    .observeOn(Schedulers.single())
    .subscribe().addTo(disposable)

Here’s what the code above does. It:

  1. Creates a source, outerSource, that emits an item every three seconds
  2. Creates another source, innerSource, that emits an item every second
  3. Uses switchOnNext(), which causes every item emitted by the outer source (i.e. onNext) to create the inner source. When the outer Observable emits another item, the inner one gets discarded and a new one is created, which is then used to emit items.For every item emitted by the outer Observable (i.e. every three seconds), the inner observable manages to emit three items (one item per second). The last item isn’t relayed downstream, as the outer source has already emitted an item (similar to in the marble diagram above).

Build and run, and tap Switch On Next located inside Demo. Note that every feature inside Demo is printed in Logcat and isn’t visible in TouRx’s UI. So open up Logcat to assess the printed logs and input –> to filter out the noise. You’ll get something like this:

2020-08-21 22:02:18.964 I/-->: Emitted by OuterSource: 0
2020-08-21 22:02:18.964 I/-->: Starting InnerSource
2020-08-21 22:02:19.965 I/-->: Relayed items 0
2020-08-21 22:02:20.964 I/-->: Relayed items 1
2020-08-21 22:02:21.964 I/-->: Emitted by OuterSource: 1
2020-08-21 22:02:21.964 I/-->: Starting InnerSource
2020-08-21 22:02:22.964 I/-->: Relayed items 0
2020-08-21 22:02:23.964 I/-->: Relayed items 1
2020-08-21 22:02:24.964 I/-->: Emitted by OuterSource: 2
2020-08-21 22:02:24.964 I/-->: Starting InnerSource
2020-08-21 22:02:25.964 I/-->: Relayed items 0
2020-08-21 22:02:26.964 I/-->: Relayed items 1
2020-08-21 22:02:27.964 I/-->: Emitted by OuterSource: 3

Next, you’ll learn about the join operator.

Assessing the join Operator

The join operator combines the items emitted by two sources whenever an item emitted by one falls under the duration window. In other words, it selects which items to combine based on overlaps between the streams. The windows are implemented as Observables whose lifespans begin with each item emitted by either Observable and end when the window-defining Observable completes emiting. As long as the item’s window is open, it can combine with any item emitted by the other source.

join Operator

To see the join operator in action, open PlaceListViewModel.kt, and inside demonstrateJoinBehavior(), paste the following:

disposeCurrentlyRunningStreams()

// 1
val firstObservable = Observable.interval(1000, TimeUnit.MILLISECONDS)
    .map {
      return@map "SOURCE-1 $it"
    }
// 2
val secondObservable = Observable.interval(3000, TimeUnit.MILLISECONDS)
    .map { return@map "SOURCE-2 $it" }

// 3
val firstWindow = Function<String, Observable<Long>> {
  Observable.timer(0, TimeUnit.SECONDS)
}
val secondWindow = Function<String, Observable<Long>> {
  Observable.timer(0, TimeUnit.SECONDS)
}

// 4
val resultSelector = BiFunction<String, String, String> { t1, t2 ->
  return@BiFunction "$t1, $t2"
}
    

//5
firstObservable.join(secondObservable, firstWindow, secondWindow, resultSelector)
    .doOnNext {
      Log.i(LOG_TAG, it)
    }
    .subscribeOn(Schedulers.single())
    .observeOn(Schedulers.single())
    .subscribe().addTo(disposable)

Now, build and run. Tap Demo from top-right menu, and click Join. This will fire up demonstrateJoinBehavior() in PlaceListViewModel. Open the Logcat to assess the logs. Your logs will look similar to this:

2020-08-21 22:18:20.562 I/-->: SOURCE-1 4, SOURCE-2 1
2020-08-21 22:18:23.562 I/-->: SOURCE-1 7, SOURCE-2 2
2020-08-21 22:18:26.562 I/-->: SOURCE-1 10, SOURCE-2 3
2020-08-21 22:18:29.562 I/-->: SOURCE-1 13, SOURCE-2 4
2020-08-21 22:18:32.562 I/-->: SOURCE-1 16, SOURCE-2 5
2020-08-21 22:18:35.562 I/-->: SOURCE-1 19, SOURCE-2 6
2020-08-21 22:18:38.562 I/-->: SOURCE-1 22, SOURCE-2 7
2020-08-21 22:18:41.562 I/-->: SOURCE-1 25, SOURCE-2 8
2020-08-21 22:18:44.562 I/-->: SOURCE-1 28, SOURCE-2 9

It’s time to break down the reason for this log! The code above:

  1. Creates a source, firstObservable, that emits items every second
  2. Creates a second source, secondObservable, that emits items every three seconds
  3. Initializes two windows — firstWindow and secondWindow — which define the lifespan of the window for firstObservable and secondObservable, respectively
  4. Declares a resultSelector that couples the emitted items into a String
  5. Uses join() to perform a join operation on the Observables created in the first two steps. Since the window duration is zero-seconds wide, both firstWindow and secondWindow must emit at the same time in order for them to be coupled and relayed down. With the specified intervals of sources and the length of the window, the overlaps occur every three seconds.

Try experimenting with different window lengths to learn more about the join operator. Next, you’ll learn about a similar operator to join: groupJoin.

Assessing the groupJoin Operator

The groupJoin operator is similar to the join operator, except the argument that defines how the items should be combined — i.e. resultSelector — pairs individual items emitted from the left source with another source that holds all the values emitted within the window.

GroupJoin Operator

Time to see the GroupJoin operator in action! Open PlaceListViewModel, and in demonstrateGroupJoin(), place the following code:

disposeCurrentlyRunningStreams()
// 1
val leftSource = Observable.interval(1, TimeUnit.SECONDS)
    .map { return@map "SOURCE-1 $it" }

val rightSource = Observable.interval(5, TimeUnit.SECONDS)
    .map { return@map "SOURCE-2 $it" }

// 2
val leftWindow = Function<String, Observable<Long>> {
  Observable.timer(0, TimeUnit.SECONDS)
}

val rightWindow = Function<String, Observable<Long>> {
  Observable.timer(3, TimeUnit.SECONDS)
}

// 3
val resultSelector = BiFunction<String, Observable<String>, Observable<Pair<String, String>>> { t1, t2 ->
  return@BiFunction t2.map {
    return@map Pair(t1, it)
  }
}

leftSource.groupJoin(rightSource, leftWindow, rightWindow, resultSelector)
    .concatMap {
      return@concatMap it
    }
    .subscribeOn(Schedulers.single())
    .observeOn(Schedulers.single())
    .doOnNext {
      Log.i(LOG_TAG, it.toString())
    }
    .subscribe().addTo(disposable)

Finally, build and run. Click on the top-right menu and select Group Join inside Demo. Then, open the Logcat to see the printed logs:

2020-08-21 23:17:17.756 I/-->: (SOURCE-1 4, SOURCE-2 0)
2020-08-21 23:17:18.756 I/-->: (SOURCE-1 5, SOURCE-2 0)
2020-08-21 23:17:19.756 I/-->: (SOURCE-1 6, SOURCE-2 0)
2020-08-21 23:17:20.756 I/-->: (SOURCE-1 7, SOURCE-2 0)
2020-08-21 23:17:22.756 I/-->: (SOURCE-1 9, SOURCE-2 1)
2020-08-21 23:17:23.756 I/-->: (SOURCE-1 10, SOURCE-2 1)
2020-08-21 23:17:24.756 I/-->: (SOURCE-1 11, SOURCE-2 1)
2020-08-21 23:17:25.756 I/-->: (SOURCE-1 12, SOURCE-2 1)
2020-08-21 23:17:27.756 I/-->: (SOURCE-1 14, SOURCE-2 2)
2020-08-21 23:17:28.756 I/-->: (SOURCE-1 15, SOURCE-2 2)
2020-08-21 23:17:29.756 I/-->: (SOURCE-1 16, SOURCE-2 2)
2020-08-21 23:17:30.756 I/-->: (SOURCE-1 17, SOURCE-2 2)
2020-08-21 23:17:32.756 I/-->: (SOURCE-1 19, SOURCE-2 3)
2020-08-21 23:17:33.756 I/-->: (SOURCE-1 20, SOURCE-2 3)
2020-08-21 23:17:34.756 I/-->: (SOURCE-1 21, SOURCE-2 3)
2020-08-21 23:17:35.756 I/-->: (SOURCE-1 22, SOURCE-2 3)

Here’s a breakdown of what the code above is doing:

  1. leftSource emits an item every second, whereas rightSource emits an item every five seconds.
  2. leftWindow and rightWindow are windows for leftSource and rightSource, respectively. Note the difference in the lifespan of these windows.
  3. The signature of resultSelector is an important distinction between the GroupJoin and the Join operators. Since rightWindow has a lifespan of three seconds and leftSource emits every second, the second argument in the lambda collects all the emissions in that three-second window while mapping each into a Pair before sending it downstream.

Experiment with varying window sizes to get more familiar with the GroupJoin operator.