Android & Kotlin Tutorials

Learn Android development in Kotlin, from beginner to advanced.

Working with RxJava Disposables in Kotlin

In this tutorial, you’ll learn the basics of RxJava Disposables. You will be building an app from scratch that retrieves real-time cryptocurrency data.

5/5 2 Ratings

Version

  • Kotlin 1.3, Android 6.0, Android Studio 3

In this tutorial, you’ll learn the basics of RxJava Disposables. You’ll build an app from scratch that retrieves real-time cryptocurrency data.

Note: This tutorial assumes you have some experience using RxJava. If you don’t, make a pit stop at this tutorial first before proceeding.

Reactive Android Programming consists of RxJava, RxAndroid and RxKotlin.
RxJava is the most touted Reactive Programming library in the Android Development world. It reduces many threading headaches and makes codes more intuitive.

RxAndroid is a lightweight module which binds Android specific components with RxJava classes.RxKotlin makes writing RxJava methods easier by providing convenient extension functions.

RxAssemble!

RxAssemble! You’ll use the basic functions of these three libraries to build the CryptoMe app in this tutorial.

Disposables

Disposables

What are Disposables? Disposable in plain English implies short-term convenience. This also means they are short lived or meant to be discarded after use. The same idea is conveyed in RxJava’s Disposables.

When an Observer subscribes to an Emitter, or Observables, you create a stream. This stream takes up resources which later become disposable “solid waste”. You need to handle this waste if the stream is going to run for a long time.

Observable has a method called onComplete() that will do the disposing for you when called. Many at times though, you will find it more beneficial and convenient to have the ability to cancel your subscriptions easily and at any time.

Today, you’ll tackle a more complex case. The stream will run endlessly and you’ll use Disposables to handle them to prevent memory leaks.

A Disposable is a stream or a link between an Observable and an Observer. A quick check of the documentation shows that it has two main methods, dispose() and isDisposed(). The former disposes of the link, while the latter checks if the link has been disposed.

Testing Disposables

When you establish a subscription between Observable and Observer, it returns a Disposable. For example, take a look at this code:

import android.util.Log
import io.reactivex.Observable
import java.util.concurrent.TimeUnit

object DisposableTester {

    @JvmStatic fun main() {
        val seconds = Observable.interval(1, TimeUnit.SECONDS)
        val disposable = seconds.subscribe({ l -> logData(l) })

        //sleep 10 seconds
        sleep(10000)

        //Dispose and stop emissions
        disposable.dispose()

        Log.d("Test","Disposed!")

        //Sleep 10 seconds to prove
        //There are no more emissions
        sleep(10000)
    }

    private fun logData(l: Long) {
        Log.d("Test","Received: " + l)
    }

    private fun sleep(millis:Int) {
        try {
            Thread.sleep(millis.toLong())
        }
        catch (e:InterruptedException) {
            e.printStackTrace()
        }
    }

}

An Observable runs and emits every second. After ten seconds of emissions, the Disposable resource returned from subscribe() is disposed of by explicitly calling dispose(). Then another ten seconds timer emits to verify that the resource is already disposed of.

Handling Disposables outside is one way to dispose of resources that are no longer needed. Since RxJava 2.0, the Observer has the ability to dispose of the subscription at any time. For an example:

import io.reactivex.Observer
import io.reactivex.disposables.Disposable

object DisposableTester {

    var myObserver: Observer<Int> = object: Observer<Int> {
        private var disposable: Disposable? = null

        override fun onSubscribe(disposable: Disposable) {
            this.disposable = disposable
        }
        override fun onNext(value:Int) {
            //Has access to Disposable
        }
        
        override fun onError(e:Throwable) {
            //Has access to Disposable
        }
        
        override fun onComplete() {
            //Has access to Disposable
        }
    }

}

If at any time the emissions are no longer required in onNext(), onError() or onComplete(), you can stop them.

CompositeDisposables

CompositeDisposable

As the app evolves, there are scenarios where you need more than one subscription. Retrieving live data from multiple sources in a travel app for hotel, tours and air tickets is one great example.

You need to use CompositeDisposables to manage the resources. It implements Disposable and then holds a collection of disposables. Here’s a quick example:

import android.util.Log
import io.reactivex.Observable
import io.reactivex.disposables.CompositeDisposable
import java.util.concurrent.TimeUnit

object DisposableTester {

    private val disposables = CompositeDisposable()

    @JvmStatic fun main() {
        val seconds = Observable.interval(1, TimeUnit.SECONDS)

        //Subscribe and capture disposables
        val disposable1 = seconds.subscribe({ l -> logData(l, 1) })
        val disposable2 = seconds.subscribe({ l -> logData(l, 2) })

        //Put both disposables into CompositeDisposable
        disposables.addAll(disposable1, disposable2)

        //Sleep 10 seconds
        sleep(10000)

        //Dispose all disposables
        disposables.dispose()

        Log.d("Test", ("All Disposed!"))

        //Sleep 10 seconds to prove
        //there are no more emissions
        sleep(10000)
    }

    private fun logData(l: Long, observerNumber: Int) {
        Log.d("Test", ("Observer " + observerNumber + ": " + l))
    }

    private fun sleep(millis:Int) {
        try {
            Thread.sleep(millis.toLong())
        } catch (e:InterruptedException) {
            e.printStackTrace()
        }
    }

}

In this implementation, the code leveraged on this simple utility which helps you manage a collection of Disposables. By calling add() or addAll(), you can dispose of them all at once when they’re no longer needed.

Congratulations! You’ve refreshed your knowledge on the basics of Reactive Programming and learned about Disposables. Now, you’re ready to embark on a journey in building the CryptoMe App!

CryptoMe App

Before you get started, take a look at what the app will look like once you’ve completed this tutorial.

The main page has two tabs. The Western Tab loads Crypto data from Western countries while the Eastern Tab loads Crypto data from Eastern countries.

The data comes from CryptoCompare API. You can sign up here for a free key with limited calls. The app polls for updated data every ten seconds.

By tapping on any Cryptocurrency, the app brings you to its web contents. You’ll learn to use Disposables to manage resources in this app.

Are you ready? Time to get started!

Getting Started

To get started, click on the Download Materials button at the top or bottom of this tutorial. First, explore the starter project in Android Studio 3.4.1 and above.

You’ll work on BaseFragment.kt where most of the network handling logic is. You’ll also implement some configurations in CryptoDataRepository.kt and CryptoDataViewModel.kt.

Take a look at some of the files. In App.kt, you’ll find Retrofit is the main network utility the app uses. In this file, you initialize Retrofit and use CryptoDataAPI to manage API calls.

CryptoDataRepository is a class which accepts an object and implements CryptoDataAPI. This class helps handle specific server data logic.

When the Observable receives data, it’ll process and integrate into CryptoDataViewModel. The ViewModel updates the RecyclerView which displays to users the latest crypto data.

The MainActivity contains a TabLayout, with each tab holding a single Fragment. HomeFragment and SecondFragment both inherit from BaseFragment, where most of the data processing work is.

FragmentAdapter helps populate the appropriate Fragment in each Tab. CryptoDataAdapter helps translate Data into RecyclerViews.

DetailActivity shows the web contents of a selected cryptocurrency.

Wow, that’s a lot to process!

Fear not! Having an organized project structure does reap benefits. You’ll see at the end how everything pieces together to build your very own CryptoMe app!

Now, build and run the project on your Android device or emulator. You should see a gorgeous empty search screen:

Calling CryptoCompare API

Start by adding the API key you received from CryptoCompare in CryptoDataAPI.kt:

const val APIKEY = "APIKEY" // TODO 1: Add Your Register API Key Here
...

According to the documentation here, you’ll use Multiple Symbols Price to populate live data. Update the getCryptoData function as follows:

interface CryptoDataAPI {
  ...
  //TODO 2: Declare the function to return an Observable
  fun getCryptoData(@Query("tsyms") currencies: String): Observable<LinkedTreeMap<Object, Object>>
}

Here, the data you receive is an Observable of type LinkedTreeMap. If you run the app now, nothing will happen.

Be patient! You’re wiring much of the internal logic. Exciting things are coming ahead!

Now, head over to CryptoDataRepository.kt and create a function here to make the network call.

  //TODO 3: Create a function to call API and return an Observable
  fun getCryptoData(currencies: String): Observable<LinkedTreeMap<Object, Object>> {
    return cryptoDataAPI.getCryptoData(currencies)
        .doOnNext {
          Log.d("getCryptoData", "Dispatching ${it.size} crypto data from API...")
        }
  }

This helper class uses doOnNext() to log the data size and handle any related logic. Then it passes it to the next function.

Remember the ViewModel? Currently, you have routed the crypto data from Server -> API Call -> Data Repository. Now, you’re going to punch this data into the ViewModels, making them ready for display.

Now, create a function again here:

  //TODO 4: Implement Function to Pass Data to ViewModel
  fun getCryptoData(currencies: String): Observable<List<CryptoData>> {
    return cryptoDataRepository.getCryptoData(currencies)
        // 1
        .map {
          handleResult(it)
        }
        // 2
        .onErrorReturn {
          Log.d("getCryptoData", "An error occurred")
          arrayListOf<CryptoData>().toList()
        }
  }

In this code, you:
1. This function calls another function that is already implemented for you in handleResult(). It takes in server data it, then unwraps it into CryptoData models and returns them as a list. In other words, here you’ve converted the original LinkedTreeMap into an ArrayList.

2. Do your error handling in this handler. For simplicity, you log a message and return an empty list of data.

Magic Hat

Phew, that’s a lot of code for such a simple app! You’re doing well and getting near the Magic Hat!

Now, build and run. You’re not calling the functions you created here yet. That’s what you’ll do next.

Polling With Observables

Now, head over to the final file you’ll work on, the BaseFragment.kt. First, declare two constant values at the top of the file:

//TODO 5: Constant Values for Initial Delay and Interval
const val INITIAL_DELAY_IN_MILLISECONDS: Long = 1000
const val INTERVAL_IN_MILLISECONDS: Long = 10000

The first constant sets the initial delay before making the first network call. The second constant sets the interval between each network call.

Next, locate the function call loadData(). Update it to get the network calls to work:

  private fun loadData() {
    //TODO 6: Call API using Observable
    Log.d("loadData", "Downloading Data ...")

    // 1
    val disposable = Observable.interval(INITIAL_DELAY_IN_MILLISECONDS, INTERVAL_IN_MILLISECONDS,
        TimeUnit.MILLISECONDS)
        // 2
        .observeOn(AndroidSchedulers.mainThread())
        // 3
        .subscribe(this::updateCryptoData, this::onError)

    //TODO 12: Add Disposables
  }

What this code does is:
1. You use an Observable.interval here along with the constants to create the polling logic. INITIAL_DELAY_IN_MILLISECONDS is the amount of time taken to start the first emission. INTERVAL_IN_MILLISECONDS is the breathing time before next emission. TimeUnit.MILLISECONDS sets the unit of the time values.
2. Always observe on AndroidSchedulers.mainThread() because that is where UI and user interactions reside.
3. Finally, here you used the double colon (::) operator to trigger a method reference to updateCryptoData and onError. It will initiate the polling subscription via subscribe method. This function empowers the app now to be able to call updateCryptoData at an interval infinitely.

You’ll realise that some functions are not yet available. Don’t worry, just keep following :].

Now, let’s look at updateCryptoData and implement it:

  //TODO 7: Add Update Crypto Data
  private fun updateCryptoData(aLong: Long) {
    // 1
    mSwipeRefreshLayout.isRefreshing = true
    // 2
    val observable: Observable<List<CryptoData>> = viewModel.getCryptoData(currencies)
    // 3
    observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
        .subscribe({
          Log.d("updateCryptoData", "Received UIModel $it users.")
          handleResponse(it)
        }, {
          handleError(it)
        })
  }

In the snippet above:
1. Firstly, mSwipeRefreshLayout.isRefreshing = true sets the state of the layout to start refreshing.
2. You declared a new observable which is a list of observables with the crypto data retrieved from the API.
3. You subscribed to our observable on a pre-created idle thread using Schedulers.io(). Next, observe on the main thread so that UI can be updated promptly. Then, for each emission, you passed it to handleRespones to perform the final parsing. Finally, use handleError to handle any errors.

  //TODO 8: Add onError
  private fun onError(throwable: Throwable) {
    Log.d("onError", "OnError in Observable Time: $throwable")
  }
  //TODO 9: Handle API Response & Error
  private fun handleResponse(cryptoDataList: List<CryptoData>) {
    // 1
    cryptoDataAdapter = CryptoDataAdapter(ArrayList(cryptoDataList), this)
    cryptocurrencyList.adapter = cryptoDataAdapter
    // 2
    mSwipeRefreshLayout.isRefreshing = false

    //TODO 12: Add Disposables
  }
  // 3
  private fun handleError(t: Throwable) {
    Log.d("handlleError", "Error: $t")
  }

In this code, you:
1. Remember the function you created in the ViewModel? It returns an ArrayList of data models.
2. Then, set mSwipeRefreshLayout.isRefreshing to false to end refreshing state.
3. Finally, you log our errors if any.

Next, add these codes to navigate to each currency’s web contents in onItemClick:

  override fun onItemClick(retroCrypto: CryptoData) {
    //TODO 10: Handle Item Click
    val intent = Intent(activity, DetailActivity::class.java)
    intent.putExtra("CryptoName", retroCrypto.name)
    startActivity(intent)
  }

Here, you created a new basic DetailActivity Intent with the name of the Cryptocurrency, so that the app can open the corresponding webpage.

Now, build and run. Hurray! Everything is working fine now. The app is actually polling every ten seconds to get live data.

Try now hitting the home button. Oops! If you look at the console log, it seems like the app is still firing although the app is in the background.

Go back into the app and try to tap on a currency to view its web content. The polling doesn’t stop. You need to stop this or the app will start consuming data, depleting the battery and, worse of all, a user might uninstall this app!

Disposables to the Rescue!

Declare a CompositeDisposable in BaseFragment and start collecting resources you want to manage:

  //TODO 11: Declare Disposables
  private val disposables = CompositeDisposable()

Here you created a disposables-collector to keep track of the Observable used in this app. The app has two fragments. Each fragment creates an Observable and two Disposables. Time to manage them!

First, in loadData(), update the implementation as such:

  private fun loadData() {
    //TODO 6: Call API using Observable
    Log.d("loadData", "Downloading Data ...")
    // 1
    val disposable = Observable.interval(INITIAL_DELAY_IN_MILLISECONDS, INTERVAL_IN_MILLISECONDS,
        TimeUnit.MILLISECONDS)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(this::updateCryptoData, this::onError)

    //TODO 12: Add Disposables
    Log.d("loadData", "Disposable added!")
    // 2
    disposables.add(disposable)
  }

In those lines:
1. Everything there remains the same.
2. When you call subscribe(), a Disposable is returned. You'll use logging to keep track of the number of Disposables that are added. You will also add the first disposable to our CompositeDisposable variable.

Now you need to track the number of Disposables in handleResponse():

  private fun handleResponse(cryptoDataList: List<CryptoData>) {
    // 1
    cryptoDataAdapter = CryptoDataAdapter(ArrayList(cryptoDataList), this)
    cryptocurrencyList.adapter = cryptoDataAdapter

    mSwipeRefreshLayout.isRefreshing = false
    // 2    
    Log.d("handleResponse", "We have ${disposables.size()} disposables")
    ...
  }

The above code performs the following:
1. Everything here remains the same.
2. By calling size(), you get the number of Disposables and log it here.

Remember that Disposables are meant to be disposed of at correct places. You don't want the app to poll when it's no longer in use or when the user navigates out of the page. So, add them to these places:

  override fun onPause() {
    super.onPause()

    //TODO 13: Clear Disposables
    disposables.clear()

    Log.d("onPause", "Clear Disposables")
  }

  override fun onStop() {
    super.onStop()

    //TODO 14: Clear Disposables
    disposables.clear()

    Log.d("onStop", "Clear Disposables")
  }

Finally, build and run. Now, when you navigate to the web contents or leave the app, you should see Clear Disposables and the polling should stop. When you return to the app, the Disposables are added again and the polling resumes.

You've mastered working with Disposables. :)

Where to Go From Here?

You can download the final project by clicking the Download Materials button at the top or bottom of this tutorial. You've learned how to work with Disposables. One of the most common ways to use them is to manage resources and prevent memory leaks.

To learn more about RxKotlin and Disposables, refer to the ReactiveX documentation.

If you have any comments or questions, don’t hesitate to join the discussion below!

Average Rating

5/5

Add a rating for this content

2 ratings

Contributors

Comments