Home Android & Kotlin Books Reactive Programming with Kotlin

3
Subjects Written by Alex Sullivan & Scott Gardner

You’ve gotten a handle on what an Observable is, how to create one, how to subscribe to it, and how to dispose of things when you’re done. Observables are a fundamental part of RxJava, but a common need when developing apps is to manually add new values onto an Observable at runtime that will then be emitted to subscribers. What you want is something that can act as both an Observable and as an observer. And that something is called a subject.

In this chapter, you’re going to learn about the different types of subjects in RxJava, see how to work with each one and why you might choose one over another based on some common use cases.

Getting started

Open the starter project for this chapter in IntelliJ IDEA and add the following code to the Main.kt file:

exampleOf("PublishSubject") {
  val publishSubject = PublishSubject.create<Int>()
}

Here, you create a PublishSubject using a static method create. The class is aptly named, because, like a newspaper publisher, it will receive information and then turn around and publish it to subscribers, possibly after modifying that information in some way first. The subject here is of type Int, so it can only receive and publish integers. After being instantiated, it’s ready to receive data.

Add the following code to the example:

publishSubject.onNext(0)

This sends a new integer into the subject. The console doesn’t print out anything yet because there are no observers. Create one by adding the following code to the example:

val subscriptionOne = publishSubject.subscribe { int ->
  println(int)
}

You created a subscription to publishSubject just like in the last chapter, printing next events. You’re using the default RxJava subscribe method rather than the fancier subscribeBy since you only care about the next event for now. But, when you run, still nothing shows up in IntelliJ IDEA’s output console. Isn’t this fun? You’re going to learn about the different subjects shortly.

What’s happening here is that a PublishSubject only emits to current subscribers. So if you weren’t subscribed when something was added to it previously, you don’t get it when you do subscribe. Think of the tree-falling analogy. If a tree falls and no one’s there to hear it, does that make your illegal logging business a success? :]

To fix things, add this code to the end of the example:

publishSubject.onNext(1)

Notice that, because you defined the publish subject to be of type Int, only integers may be sent into it.

Now, because publishSubject has a subscriber, it will emit that integer:

--- Example of: PublishSubject ---
1

In a similar fashion to the subscribe parameters, onNext is how you add a new next event into a subject, passing the element as the parameter:

publishSubject.onNext(2)

Now the 2 is printed as well:

--- Example of: PublishSubject ---
1
2

With that gentle intro, now it’s time to learn all about subjects.

What are subjects?

Subjects act as both an Observable and an observer. You saw earlier how they can receive events and also be subscribed to. The subject received next events, and each time it received an event, it turned around and emitted it to its subscriber.

There are four subject types in RxJava:

  • PublishSubject: Starts empty and only emits new elements to subscribers.
  • BehaviorSubject: Starts with an optional initial value and replays it or the latest element to new subscribers.
  • ReplaySubject: Initialized with a buffer size and will maintain a buffer of elements up to that size and replay it to new subscribers.
  • AsyncSubject: Starts empty and only emits the last item it receives before it’s completed to subscribers.

Taking on each of these in turn, you’re going to learn a lot more about subjects and how to work with them next.

Working with publish subjects

Publish subjects come in handy when you simply want subscribers to be notified of new events from the point at which they subscribed, until they either unsubscribe, or the subject has terminated with a complete or error event.

In the following marble diagram, the top line is the publish subject and the second and third lines are subscribers. The upward-pointing arrows indicate subscriptions, and the downward-pointing arrows represent emitted events.

The first subscriber subscribes after 1, so it doesn’t receive that event. It does get 2 and 3, though. And because the second subscriber doesn’t join in on the fun until after 2, it only gets 3.

Returning to the project, add this code to the bottom of the same example:

val subscriptionTwo = publishSubject
  .subscribe { int ->
    printWithLabel("2)", int)
  }

printWithLabel is a simple helper function that — you guessed it — prints a label and a corresponding value. In the example above, "2)" is the label.

As expected, subscriptionTwo doesn’t print anything out yet because it subscribed after the 1 and 2 were emitted. Now, enter this code:

publishSubject.onNext(3)

The 3 is printed twice, once for subscriptionOne and once for subscriptionTwo.

1
2
3
2) 3

Add this code to terminate subscriptionOne and then add another .next event onto the subject:

subscriptionOne.dispose()

publishSubject.onNext(4)

The value 4 is only printed for subscription 2), because subscriptionOne was disposed.

1
2
3
2) 3
2) 4

When a publish subject receives a completed or error event, also known as a terminal event, it will emit that terminal event to new subscribers and it will no longer emit next events. However, it will re-emit its terminal event to future subscribers. Add this code to the example:


// 1
publishSubject.onComplete()

// 2
publishSubject.onNext(5)

// 3
subscriptionTwo.dispose()

// 4
val subscriptionThree = publishSubject.subscribeBy(
    onNext = { printWithLabel("3)", it) },
    onComplete = { printWithLabel("3)", "Complete") }
)

publishSubject.onNext(6)

Here’s what you do with the code above:

  1. Send the complete event through the subject via the onComplete method. This effectively terminates the subject’s observable sequence.
  2. Send another element 5 into the subject. This won’t be emitted and printed, though, because the subject has already terminated.
  3. Don’t forget to dispose of subscriptions when you’re done!
  4. Create a new subscription to the subject, using the subscribeBy method to listen for the onComplete event.

Maybe the new subscriber subscriptionThree will kickstart the subject back into action? Nope, but you do still get the complete event:

...
3) Complete

Actually, every subject type, once terminated, will re-emit its stop event to future subscribers. So it’s a good idea to include handlers for stop events in your code, not just to be notified when it terminates, but also in case it is already terminated when you subscribe to it.

You might use a publish subject when you’re modeling time-sensitive data, such as in an online bidding app. It wouldn’t make sense to alert the user who joined at 10:01 am that at 9:59 a.m. there was only one minute left in the auction. That is, of course, unless you like one-star reviews to your bidding app.

Sometimes, you want to let new subscribers know what the latest element value is, even though that element was emitted before the subscription. For that, you’ve got some options.

Working with behavior subjects

Behavior subjects work similarly to publish subjects, except they will replay the latest next event to new subscribers. Check out this marble diagram:

The first line from the top is the subject. The first subscriber on the second line down subscribes after 1 but before 2, so it gets 1 immediately upon subscription, and then 2 and 3 as they’re emitted by the subject. Similarly, the second subscriber subscribes after 2 but before 3, so it gets 2 immediately and then 3 when it’s emitted.

Add this new example to your project:

// 1
exampleOf("BehaviorSubject") {
  // 2
  val subscriptions = CompositeDisposable()
  // 3
  val behaviorSubject =
      BehaviorSubject.createDefault("Initial value")
}

Here’s the play-by-play:

  1. Start a new BehaviorSubject example.
  2. Create a CompositeDisposable, which you’ll use later on.
  3. Create a new BehaviorSubject using the static factory method createDefault, which takes an initial value to be immediately emitted.

Note: BehaviorSubject can also be initialized without an initial value. You can use the create static factory method to make one without an initial value.

Now, add the following code to the example:

val subscriptionOne = behaviorSubject.subscribeBy(
  onNext = { printWithLabel("1)", it) },
  onError = { printWithLabel("1)", it) }
)

This creates a subscription to the subject, but the subscription was created after the subject was. No other elements have been added to the subject, so it replays the initial value to the subscriber.

--- Example of: BehaviorSubject ---
1) Initial value

Now, insert the following code right before the previous subscription code, but after the definition of the subject:

behaviorSubject.onNext("X")

The X is printed, because now it’s the latest element when the subscription is made:

--- Example of: BehaviorSubject ---
1) X

Add the following code to the end of the example — but, first, look it over and see if you can determine what will be printed:

// 1
behaviorSubject.onError(RuntimeException("Error!"))
// 2
subscriptions.add(behaviorSubject.subscribeBy(
  onNext = { printWithLabel("2)", it) },
  onError = { printWithLabel("2)", it) }
))

Taking it section-by-section:

  1. Add a RuntimeException error event into the subject.
  2. Create a new subscription to the subject.

Did you figure out that the error event will be printed twice, once for each subscription? If so, right on!

1) X
1) java.lang.RuntimeException: Error!
2) java.lang.RuntimeException: Error!

Another benefit of using a BehaviorSubject is it allows you to access whatever its latest value is imperatively. Add the code below to create another example:

exampleOf("BehaviorSubject State") {

  val subscriptions = CompositeDisposable()
  val behaviorSubject = BehaviorSubject.createDefault(0)

  println(behaviorSubject.value)
}

After running the example, you should see the following:

--- Example of: BehaviorSubject State ---
0

BehaviorSubjects allow you to reference their last emitted value — notice the behaviorSubject.value call in the last line of the example.

Add the following to the example:

// 1
subscriptions.add(behaviorSubject.subscribeBy {
  printWithLabel("1)", it)
})

// 2
behaviorSubject.onNext(1)
// 3
println(behaviorSubject.value)
// 4
subscriptions.dispose()

Let’s break the above down section by section:

  1. Subscribe to the BehaviorSubject and add its disposable to a CompositeDisposable so you can dispose of it later.
  2. Call onNext sending another value into the subject.
  3. Print whatever the current value of the subject is.
  4. Dispose the subscriptions.

Using the value in a BehaviorSubject can help you bridge the gap between the Rx world and the non-Rx world!

Behavior subjects are useful when you want to pre-populate a view with the most recent data. For example, you could bind controls in a user profile screen to a behavior subject, so that the latest values can be used to pre-populate the display while the app fetches fresh data.

But what if you wanted to show more than the latest value? For example, on a search screen, you may want to show the most recent five search terms used. This is where replay subjects come in.

Working with replay subjects

Replay subjects will temporarily cache — or buffer — the latest elements they emit, up to a specified size of your choosing. They will then replay that buffer to new subscribers.

The following marble diagram depicts a replay subject with a buffer size of 2. The first subscriber (middle line) is already subscribed to the replay subject (top line) so it gets elements as they’re emitted. The second subscriber (bottom line) subscribes after 2, so it gets 1 and 2 replayed to it.

Keep in mind that, when using a replay subject, this buffer is held in memory. You can definitely shoot yourself in the foot, here, if you set a large buffer size for a replay subject of some type whose instances each take up a lot of memory — like images. Another thing to watch out for is creating a replay subject of a list of items. Each emitted element will be a list, so the buffer size will buffer that many lists. It would be easy to create memory pressure here if you’re not careful.

Add this new example to your file:

exampleOf("ReplaySubject") {

  val subscriptions = CompositeDisposable()
  // 1
  val replaySubject = ReplaySubject.createWithSize<String>(2)
  // 2
  replaySubject.onNext("1")

  replaySubject.onNext("2")

  replaySubject.onNext("3")
  // 3
  subscriptions.add(replaySubject.subscribeBy(
     onNext = { printWithLabel("1)", it) },
     onError = { printWithLabel("1)", it)}
  ))

  subscriptions.add(replaySubject.subscribeBy(
    onNext = { printWithLabel("2)", it) },
    onError = { printWithLabel("2)", it)}
  ))
}

From the top:

  1. You create a new replay subject with a buffer size of 2. Replay subjects are initialized using the static method createWithSize.
  2. Add three elements onto the subject.
  3. Create two subscriptions to the subject.

The latest two elements are replayed to both subscribers. 1 never gets emitted, because 2 and 3 were added onto the replay subject with a buffer size of 2 before anything subscribed to it:

--- Example of: ReplaySubject ---
1) 2
1) 3
2) 2
2) 3

Now, add the following code to the example:

replaySubject.onNext("4")

subscriptions.add(replaySubject.subscribeBy(
  onNext = { printWithLabel("3)", it) },
  onError = { printWithLabel("3)", it)}
))

With this code, you add another element into the subject, and then create a new subscription to it. The first two subscriptions will receive that element as normal because they were already subscribed when the new element was added to the subject, while the new third subscriber will get the last two buffered elements replayed to it:

...
1) 4
2) 4
3) 3
3) 4

You’re getting pretty good at this stuff by now, so there should be no surprises, here. What would happen if you threw a wrench into the works here? Add this line of code right after adding 4 onto the subject, before creating the third subscription:

replaySubject.onError(RuntimeException("Error!"))

This may surprise you. And if so, that’s OK. Life’s full of surprises:

1) 4
2) 4
1) java.lang.RuntimeException: Error!
2) java.lang.RuntimeException: Error!
3) 3
3) 4
3) java.lang.RuntimeException: Error!

What’s going on here? The replay subject is terminated with an error, which it will re-emit to new subscribers as you’ve already seen subjects do. But the buffer is also still hanging around, so it gets replayed to new subscribers as well, before the stop event is re-emitted.

Working with async subjects

The last type of subject in the RxJava arsenal is the AsyncSubject. Async subjects are a bit stranger and definitely a bit rarer than the other types of subjects you’ve encountered, but they’re still valuable. Here’s the lowdown.

An AsyncSubject will only ever emit the last value it received before it’s complete. So if you pass several values into an AsyncSubject and then call onComplete on it, subscribers will only see the last value you passed into the subject and then a complete event. If the subject receives an error event, subscribers will see nothing!

The following marble diagram demonstrates the above. The first line is the AsyncSubject — it gets a 1 value, a 2 value, and a 3 value, and then completes (denoted by the vertical bar after the 3 value).

The other two lines are subscribers, and they only receive the last value, the 3 value, before also getting a complete event.

Add the following example in your project:

exampleOf("AsyncSubject") {
  val subscriptions = CompositeDisposable()
  // 1
  val asyncSubject = AsyncSubject.create<Int>()
  // 2
  subscriptions.add(asyncSubject.subscribeBy(
     onNext = { printWithLabel("1)", it) },
     onComplete = { printWithLabel("1)", "Complete") }
   ))
  // 3
   asyncSubject.onNext(0)
   asyncSubject.onNext(1)
   asyncSubject.onNext(2)
   // 4
   asyncSubject.onComplete()

   subscriptions.dispose()
}

Taking things step by step, again:

  1. Build an AsyncSubject that will handle Ints.
  2. Subscribe to the subject, printing out both next events and complete events.
  3. Send three values into the subject: 0, 1, and 2.
  4. complete the subject.

What kind of output would you expect to get? Run the project and you should see the following:

--- Example of: AsyncSubject ---
1) 2
1) Complete

Since 2 was the last element sent into the subject before it completed, the subscriber only sees 2 before it receives the complete event.

AsyncSubjects definitely take a back seat to some of the other subjects you’ve seen in this chapter, but they can be super useful in the right scenario! For example, imagine you have a game summary screen that you want to update with the final values of some game. An AsyncSubject would be perfect to listen for score changes, since the only score you care about is the last one before the game finishes and the subject completes!

Working with the RxRelay library

Subjects are fantastic — but, sometimes, they don’t quite get it right. You’ll often want to represent an infinite stream that will never terminate. That means it will never send a complete event or an error event. For example, say you have a subject that pipes through the current user of your app, so you can update a profile page when a new user logs in. As long as your app is alive, that stream should be active!

If you use a normal subject, someone could inadvertently call onComplete or onError, thus terminating the stream. That means that when Dave logs out of your app and Susy logs in, she’ll see all of Dave’s profile information. Sounds like trouble waiting to happen!

Enter the RxRelay library. RxRelay mimics all of the subjects you’ve come to know and love, but without the option of calling onComplete or onError. Add the following to your project and don’t worry about the compiler error:

exampleOf("RxRelay") {
  val subscriptions = CompositeDisposable()

  val publishRelay = PublishRelay.create<Int>()

  subscriptions.add(publishRelay.subscribeBy(
    onNext = { printWithLabel("1)", it) }
  ))

  publishRelay.accept(1)
  publishRelay.accept(2)
  publishRelay.accept(3)
}

In the above example, you’re using a PublishRelay instead of a PublishSubject. Most things about the relay are the same — just like in a PublishSubject, a subscriber will only receive elements after they subscribe. But by using a PublishRelay you guarantee no one else in the codebase will call onComplete or onError in the stream.

Including the RxRelay library is easy — just add the following to your build.gradle file, then run Gradle sync:

implementation 'com.jakewharton.rxrelay3:rxrelay:3.0.0'

You can now run the example and see the result:

--- Example of: RxRelay ---
1) 1
1) 2
1) 3

RxRelay comes with a replacement relay for PublishSubject, BehaviorSubject, and ReplaySubject. There’s no relay version of AsyncSubject since it depends on the subject receiving a complete event — so it wouldn’t make sense in Relay land!

Challenge

Challenge: Create a blackjack card dealer using a publish subject

Put your new super subject skills to the test by completing this challenge. There are start and end versions for each challenge in the chapter downloads.

In case you’re not familiar with it, blackjack is a card game where the goal is to get 21 or as close as possible without going over, which is called getting “busted.”

The starter project for this challenge implements a publish subject to model a hand of cards. To do so, the type of the subject is a list of pairs of String and Int to store the suit and the card value.

val dealtHand = PublishSubject.create<List<Pair<String, Int>>>()

Aces are high, so an Ace of Spades has a value of 11 compared to a Queen of Hearts which has a value of 10.

In the SupportCode.kt file for this challenge, there is a cards list of pairs of String and Int to represent a standard deck of 52 cards.

val cards = mutableListOf(
    Pair("🂡", 11), Pair("🂢", 2),Pair("🂣", 3), ...)

There’s also a couple of functions here. cardString() will take a list of card pairs and extract just the strings representing the cards.

fun cardString(hand: List<Pair<String, Int>>): String {
  return hand.joinToString("") { it.first }
}

And points() tallies up the points for the passed in list of card pairs.

Remember that you can’t go over 21 points or else you’re busted, so there’s also an error sealed class to model that.

sealed class HandError: Throwable() {
  class Busted: HandError()
}

And there’s an extension function on IntRange to get a random Int in a range:

fun IntRange.random() =
    Random().nextInt(endInclusive - start) +  start

OK, back in the main function, you have two tasks.

The first is to add code below the comment “Add code to update dealtHand here” that will evaluate the result returned from calling points(), passing the hand list. If the result is greater than 21, add the error HandError.Busted() onto dealtHand. Otherwise, add the hand onto dealtHand as a next event.

Your second task is to subscribe to the dealtHand right below the comment indicating to do that. Handle both the next and error events. For next events, you can just print out the result from calling the cardString() and points() functions from the support code. And for an error event, just print out the error.

Alright that’s it. Good luck on this challenge!

Key points

  • Subjects are Observables that are also observers.
  • You can send events over subjects by using onNext, onError and onComplete.
  • PublishSubject is used when you only want to receive events that occur after you’ve subscribed.
  • BehaviorSubject will relay the latest event that has occurred when you subscribe, including an optional initial value.
  • ReplaySubject will buffer a configurable number of events that get replayed to new subscribers. You must watch out for buffering too much data in a replay subject.
  • AsyncSubject only sends subscribers the most recent next event upon a complete event occurring.
  • The RxRelay library can be used with relays in place of subjects, to prevent accidental complete and error events to be sent.

Where to go from here?

You’ve now learned about Observables and observers, and seen how to combine them into a single type called a subject.

Now it’s time to put all you’ve learned into practice in an Android app. You’ll start to do so in the next chapter!

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.

Have feedback to share about the online reading experience? If you have feedback about the UI, UX, highlighting, or other features of our online readers, you can send them to the design team with the form below:

© 2021 Razeware LLC