Bridging RxJava and LiveData

Of course, the Architecture Components have Lifecycle, LifecycleOwner, and related classes for performing operations when certain lifecycle events occur. LiveData — the Architecture Components’ counterpart to RxJava — is intrinsically lifecycle-aware.

So, another option would be to have some sort of adapter that converts RxJava into LiveData. We could then observe the LiveData, knowing that our Observer would be cleaned up automatically as part of normal lifecycle management.

Fortunately, the Architecture Components has LiveDataReactiveStreams, for converting LiveData to and from RxJava structures, as is illustrated in the Trips/RxLifecycle sample project.

LiveDataReactiveStreams is in yet another artifact, android.arch.lifecycle:reactivestreams. So, you need to request that artifact with the others that you are using:

dependencies {
  implementation "com.android.support:recyclerview-v7:28.0.0"
  implementation "com.android.support:support-core-utils:28.0.0"
  implementation "com.android.support:support-fragment:28.0.0"
  implementation 'io.reactivex.rxjava2:rxjava:2.2.2'
  implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
  implementation 'android.arch.lifecycle:runtime:1.1.1'
  implementation 'android.arch.lifecycle:livedata:1.1.1'
  implementation 'android.arch.lifecycle:reactivestreams:1.1.1'
  implementation "android.arch.persistence.room:runtime:1.1.1"
  implementation "android.arch.persistence.room:rxjava2:1.1.1"
  annotationProcessor "android.arch.persistence.room:compiler:1.1.1"
  androidTestImplementation "com.android.support:support-annotations:28.0.0"
  androidTestImplementation 'com.android.support.test:rules:1.0.2'
  androidTestImplementation 'android.arch.core:core-testing:1.1.1'
  androidTestImplementation "com.android.support:support-core-utils:28.0.0"
  androidTestImplementation "com.android.support:support-compat:28.0.0"
  androidTestImplementation 'android.arch.lifecycle:runtime:1.1.1'
  androidTestImplementation 'android.arch.lifecycle:common:1.1.1'
}

From RxJava to LiveData

To bridge from RxJava to LiveData, LiveDataReactiveStreams offers a fromPublisher() method. Here, “publisher” refers to Publisher from the Reactive Streams initiative. Most RxJava Observable types do not implement the Publisher interface, but Flowable does. And most RxJava Observable types can be converted to a Flowable via the toFlowable() method.

As a result, the recipe for using LiveDataReactiveStreams is:

  @Override
  public void onViewCreated(View view, Bundle savedInstanceState) {
    super.onViewCreated(view, savedInstanceState);

    setLayoutManager(new LinearLayoutManager(getActivity()));
    getRecyclerView()
      .addItemDecoration(new DividerItemDecoration(getActivity(),
        LinearLayoutManager.VERTICAL));

    TripStore store=TripDatabase.get(getActivity()).tripStore();

    Flowable<List<Trip>> trips=store.maybeAllTrips()
      .subscribeOn(Schedulers.single())
      .observeOn(AndroidSchedulers.mainThread())
      .toFlowable();

    LiveDataReactiveStreams.fromPublisher(trips)
      .observe(this, this::setAdapter);
  }

Here, we do this work inside of TripsFragment, which is a LifecycleOwner courtesy of the support-fragment implementation of Fragment. We do not have to worry about cleaning up the LiveData ourselves; the lifecycle system will handle this for us.

However, this comes with a significant side effect: on a configuration change, you may be handed data twice:

For example, suppose that you have a method that returns a Single like this:

Single<Set<String>> loadStringSet() {
  return Single.create(emitter -> {
    SharedPreferences prefs=PreferenceManager.getDefaultSharedPreferences(ctxt);

    emitter.onSuccess(prefs.getStringSet(PREF_STUFF, Collections.emptySet()));
  });
}

Here, we are loading a Set of String objects from SharedPreferences. Since on the first load of data from SharedPreferences, SharedPreferences performs disk I/O, it is safest (in terms of avoiding jank) to always access SharedPreferences on a background thread.

If we use LiveDataReactiveStreams.fromPublisher() to wrap that Single in a LiveData object, the first observer of the LiveData will trigger the Single to load its data. When a configuration change occurs, the LiveData wrapper around the Single unsubscribes from the Single, then re-subscribes when the new activity/fragment instance starts observing the LiveData again. For a cold observable like this one, this causes our lambda expression to be re-evaluated. Both that newly-generated result and the previously-cached result from the LiveData will be delivered to our activity’s (or fragment’s) observer. The observer needs to be aware of this and take appropriate steps: it needs to replace any existing data, not add to it.

From LiveData to RxJava

If, for some reason, you need to convert a LiveData to something in the RxJava space, toPublisher() on LiveDataReactiveStreams can adapt a LiveData to a Publisher. On its own, Publisher only offers a subscribe() method. However, Observable.fromPublisher() can adapt a Publisher into an Observable, and from there you can set up RxJava chains as needed.


Prev Table of Contents Next

This book is licensed under the Creative Commons Attribution-ShareAlike 4.0 International license.