Working with RxJava Disposables in Kotlin

In this tutorial, you’ll learn the basics of RxJava Disposables. You will be building an app from scratch that retrieves real-time cryptocurrency data. By Lawrence Tan 🇸🇬.

Leave a rating/review
Download materials
Save for later
Share
You are currently viewing page 2 of 3 of this article. Click here to view the first page.

Calling CryptoCompare API

Start by adding the API key you received from CryptoCompare in CryptoDataAPI.kt:

const val APIKEY = "APIKEY" // TODO 1: Add Your Register API Key Here
...

According to the documentation here, you’ll use Multiple Symbols Price to populate live data. Update the getCryptoData function as follows:

interface CryptoDataAPI {
  ...
  //TODO 2: Declare the function to return an Observable
  fun getCryptoData(@Query("tsyms") currencies: String): Observable<LinkedTreeMap<Object, Object>>
}

Here, the data you receive is an Observable of type LinkedTreeMap. If you run the app now, nothing will happen.

Be patient! You’re wiring much of the internal logic. Exciting things are coming ahead!

Now, head over to CryptoDataRepository.kt and create a function here to make the network call.

  //TODO 3: Create a function to call API and return an Observable
  fun getCryptoData(currencies: String): Observable<LinkedTreeMap<Object, Object>> {
    return cryptoDataAPI.getCryptoData(currencies)
        .doOnNext {
          Log.d("getCryptoData", "Dispatching ${it.size} crypto data from API...")
        }
  }

This helper class uses doOnNext() to log the data size and handle any related logic. Then it passes it to the next function.

Remember the ViewModel? Currently, you have routed the crypto data from Server -> API Call -> Data Repository. Now, you’re going to punch this data into the ViewModels, making them ready for display.

Now, create a function again here:

  //TODO 4: Implement Function to Pass Data to ViewModel
  fun getCryptoData(currencies: String): Observable<List<CryptoData>> {
    return cryptoDataRepository.getCryptoData(currencies)
        // 1
        .map {
          handleResult(it)
        }
        // 2
        .onErrorReturn {
          Log.d("getCryptoData", "An error occurred")
          arrayListOf<CryptoData>().toList()
        }
  }

In this code, you:
1. This function calls another function that is already implemented for you in handleResult(). It takes in server data it, then unwraps it into CryptoData models and returns them as a list. In other words, here you’ve converted the original LinkedTreeMap into an ArrayList.

2. Do your error handling in this handler. For simplicity, you log a message and return an empty list of data.

Magic Hat

Phew, that’s a lot of code for such a simple app! You’re doing well and getting near the Magic Hat!

Now, build and run. You’re not calling the functions you created here yet. That’s what you’ll do next.

Polling With Observables

Now, head over to the final file you’ll work on, the BaseFragment.kt. First, declare two constant values at the top of the file:

//TODO 5: Constant Values for Initial Delay and Interval
const val INITIAL_DELAY_IN_MILLISECONDS: Long = 1000
const val INTERVAL_IN_MILLISECONDS: Long = 10000

The first constant sets the initial delay before making the first network call. The second constant sets the interval between each network call.

Next, locate the function call loadData(). Update it to get the network calls to work:

  private fun loadData() {
    //TODO 6: Call API using Observable
    Log.d("loadData", "Downloading Data ...")

    // 1
    val disposable = Observable.interval(INITIAL_DELAY_IN_MILLISECONDS, INTERVAL_IN_MILLISECONDS,
        TimeUnit.MILLISECONDS)
        // 2
        .observeOn(AndroidSchedulers.mainThread())
        // 3
        .subscribe(this::updateCryptoData, this::onError)

    //TODO 12: Add Disposables
  }

What this code does is:
1. You use an Observable.interval here along with the constants to create the polling logic. INITIAL_DELAY_IN_MILLISECONDS is the amount of time taken to start the first emission. INTERVAL_IN_MILLISECONDS is the breathing time before next emission. TimeUnit.MILLISECONDS sets the unit of the time values.
2. Always observe on AndroidSchedulers.mainThread() because that is where UI and user interactions reside.
3. Finally, here you used the double colon (::) operator to trigger a method reference to updateCryptoData and onError. It will initiate the polling subscription via subscribe method. This function empowers the app now to be able to call updateCryptoData at an interval infinitely.

You’ll realise that some functions are not yet available. Don’t worry, just keep following :].

Now, let’s look at updateCryptoData and implement it:

  //TODO 7: Add Update Crypto Data
  private fun updateCryptoData(aLong: Long) {
    // 1
    mSwipeRefreshLayout.isRefreshing = true
    // 2
    val observable: Observable<List<CryptoData>> = viewModel.getCryptoData(currencies)
    // 3
    observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
        .subscribe({
          Log.d("updateCryptoData", "Received UIModel $it users.")
          handleResponse(it)
        }, {
          handleError(it)
        })
  }

In the snippet above:
1. Firstly, mSwipeRefreshLayout.isRefreshing = true sets the state of the layout to start refreshing.
2. You declared a new observable which is a list of observables with the crypto data retrieved from the API.
3. You subscribed to our observable on a pre-created idle thread using Schedulers.io(). Next, observe on the main thread so that UI can be updated promptly. Then, for each emission, you passed it to handleRespones to perform the final parsing. Finally, use handleError to handle any errors.

  //TODO 8: Add onError
  private fun onError(throwable: Throwable) {
    Log.d("onError", "OnError in Observable Time: $throwable")
  }
  //TODO 9: Handle API Response & Error
  private fun handleResponse(cryptoDataList: List<CryptoData>) {
    // 1
    cryptoDataAdapter = CryptoDataAdapter(ArrayList(cryptoDataList), this)
    cryptocurrencyList.adapter = cryptoDataAdapter
    // 2
    mSwipeRefreshLayout.isRefreshing = false

    //TODO 12: Add Disposables
  }
  // 3
  private fun handleError(t: Throwable) {
    Log.d("handlleError", "Error: $t")
  }

In this code, you:
1. Remember the function you created in the ViewModel? It returns an ArrayList of data models.
2. Then, set mSwipeRefreshLayout.isRefreshing to false to end refreshing state.
3. Finally, you log our errors if any.

Next, add these codes to navigate to each currency’s web contents in onItemClick:

  override fun onItemClick(retroCrypto: CryptoData) {
    //TODO 10: Handle Item Click
    val intent = Intent(activity, DetailActivity::class.java)
    intent.putExtra("CryptoName", retroCrypto.name)
    startActivity(intent)
  }

Here, you created a new basic DetailActivity Intent with the name of the Cryptocurrency, so that the app can open the corresponding webpage.

Now, build and run. Hurray! Everything is working fine now. The app is actually polling every ten seconds to get live data.

Try now hitting the home button. Oops! If you look at the console log, it seems like the app is still firing although the app is in the background.

Go back into the app and try to tap on a currency to view its web content. The polling doesn’t stop. You need to stop this or the app will start consuming data, depleting the battery and, worse of all, a user might uninstall this app!

Disposables to the Rescue!

Declare a CompositeDisposable in BaseFragment and start collecting resources you want to manage:

  //TODO 11: Declare Disposables
  private val disposables = CompositeDisposable()

Here you created a disposables-collector to keep track of the Observable used in this app. The app has two fragments. Each fragment creates an Observable and two Disposables. Time to manage them!

First, in loadData(), update the implementation as such:

  private fun loadData() {
    //TODO 6: Call API using Observable
    Log.d("loadData", "Downloading Data ...")
    // 1
    val disposable = Observable.interval(INITIAL_DELAY_IN_MILLISECONDS, INTERVAL_IN_MILLISECONDS,
        TimeUnit.MILLISECONDS)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(this::updateCryptoData, this::onError)

    //TODO 12: Add Disposables
    Log.d("loadData", "Disposable added!")
    // 2
    disposables.add(disposable)
  }

In those lines:
1. Everything there remains the same.
2. When you call subscribe(), a Disposable is returned. You'll use logging to keep track of the number of Disposables that are added. You will also add the first disposable to our CompositeDisposable variable.

Now you need to track the number of Disposables in handleResponse():

  private fun handleResponse(cryptoDataList: List<CryptoData>) {
    // 1
    cryptoDataAdapter = CryptoDataAdapter(ArrayList(cryptoDataList), this)
    cryptocurrencyList.adapter = cryptoDataAdapter

    mSwipeRefreshLayout.isRefreshing = false
    // 2    
    Log.d("handleResponse", "We have ${disposables.size()} disposables")
    ...
  }

The above code performs the following:
1. Everything here remains the same.
2. By calling size(), you get the number of Disposables and log it here.

Remember that Disposables are meant to be disposed of at correct places. You don't want the app to poll when it's no longer in use or when the user navigates out of the page. So, add them to these places:

  override fun onPause() {
    super.onPause()

    //TODO 13: Clear Disposables
    disposables.clear()

    Log.d("onPause", "Clear Disposables")
  }

  override fun onStop() {
    super.onStop()

    //TODO 14: Clear Disposables
    disposables.clear()

    Log.d("onStop", "Clear Disposables")
  }

Finally, build and run. Now, when you navigate to the web contents or leave the app, you should see Clear Disposables and the polling should stop. When you return to the app, the Disposables are added again and the polling resumes.

You've mastered working with Disposables. :)