Bridging RxJava and LiveData
Of course, the Architecture Components have Lifecycle
, LifecycleOwner
, and related classes for performing operations when certain lifecycle events occur. LiveData
— the Architecture Components’ counterpart to RxJava — is intrinsically lifecycle-aware.
So, another option would be to have some sort of adapter that converts RxJava into LiveData
. We could then observe the LiveData
, knowing that our Observer
would be cleaned up automatically as part of normal lifecycle management.
Fortunately, the Architecture Components has LiveDataReactiveStreams
, for converting LiveData
to and from RxJava structures, as is illustrated in the Trips/RxLifecycle
sample project.
LiveDataReactiveStreams
is in yet another artifact, android.arch.lifecycle:reactivestreams
. So, you need to request that artifact with the others that you are using:
dependencies {
implementation "com.android.support:recyclerview-v7:28.0.0"
implementation "com.android.support:support-core-utils:28.0.0"
implementation "com.android.support:support-fragment:28.0.0"
implementation 'io.reactivex.rxjava2:rxjava:2.2.2'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'android.arch.lifecycle:runtime:1.1.1'
implementation 'android.arch.lifecycle:livedata:1.1.1'
implementation 'android.arch.lifecycle:reactivestreams:1.1.1'
implementation "android.arch.persistence.room:runtime:1.1.1"
implementation "android.arch.persistence.room:rxjava2:1.1.1"
annotationProcessor "android.arch.persistence.room:compiler:1.1.1"
androidTestImplementation "com.android.support:support-annotations:28.0.0"
androidTestImplementation 'com.android.support.test:rules:1.0.2'
androidTestImplementation 'android.arch.core:core-testing:1.1.1'
androidTestImplementation "com.android.support:support-core-utils:28.0.0"
androidTestImplementation "com.android.support:support-compat:28.0.0"
androidTestImplementation 'android.arch.lifecycle:runtime:1.1.1'
androidTestImplementation 'android.arch.lifecycle:common:1.1.1'
}
From RxJava to LiveData
To bridge from RxJava to LiveData
, LiveDataReactiveStreams
offers a fromPublisher()
method. Here, “publisher” refers to Publisher
from the Reactive Streams initiative. Most RxJava Observable
types do not implement the Publisher
interface, but Flowable
does. And most RxJava Observable
types can be converted to a Flowable
via the toFlowable()
method.
As a result, the recipe for using LiveDataReactiveStreams
is:
- Create your RxJava
Observable
as normal - Call
toFlowable()
on it to convert it into aFlowable
- Pass that
Flowable
tofromPublisher()
to get aLiveData
- Observe that
LiveData
and consume the results, such as with a method reference
@Override
public void onViewCreated(View view, Bundle savedInstanceState) {
super.onViewCreated(view, savedInstanceState);
setLayoutManager(new LinearLayoutManager(getActivity()));
getRecyclerView()
.addItemDecoration(new DividerItemDecoration(getActivity(),
LinearLayoutManager.VERTICAL));
TripStore store=TripDatabase.get(getActivity()).tripStore();
Flowable<List<Trip>> trips=store.maybeAllTrips()
.subscribeOn(Schedulers.single())
.observeOn(AndroidSchedulers.mainThread())
.toFlowable();
LiveDataReactiveStreams.fromPublisher(trips)
.observe(this, this::setAdapter);
}
Here, we do this work inside of TripsFragment
, which is a LifecycleOwner
courtesy of the support-fragment
implementation of Fragment
. We do not have to worry about cleaning up the LiveData
ourselves; the lifecycle system will handle this for us.
However, this comes with a significant side effect: on a configuration change, you may be handed data twice:
- Once from standard
LiveData
behavior, which automatically delivers the last-emitted object when there is a new observer, and - Possibly once from your RxJava source, as
LiveDataReactiveStreams
re-subscribes to that source as part of that configuration change
For example, suppose that you have a method that returns a Single
like this:
Single<Set<String>> loadStringSet() {
return Single.create(emitter -> {
SharedPreferences prefs=PreferenceManager.getDefaultSharedPreferences(ctxt);
emitter.onSuccess(prefs.getStringSet(PREF_STUFF, Collections.emptySet()));
});
}
Here, we are loading a Set
of String
objects from SharedPreferences
. Since on the first load of data from SharedPreferences
, SharedPreferences
performs disk I/O, it is safest (in terms of avoiding jank) to always access SharedPreferences
on a background thread.
If we use LiveDataReactiveStreams.fromPublisher()
to wrap that Single
in a LiveData
object, the first observer of the LiveData
will trigger the Single
to load its data. When a configuration change occurs, the LiveData
wrapper around the Single
unsubscribes from the Single
, then re-subscribes when the new activity/fragment instance starts observing the LiveData
again. For a cold observable like this one, this causes our lambda expression to be re-evaluated. Both that newly-generated result and the previously-cached result from the LiveData
will be delivered to our activity’s (or fragment’s) observer. The observer needs to be aware of this and take appropriate steps: it needs to replace any existing data, not add to it.
From LiveData to RxJava
If, for some reason, you need to convert a LiveData
to something in the RxJava space, toPublisher()
on LiveDataReactiveStreams
can adapt a LiveData
to a Publisher
. On its own, Publisher
only offers a subscribe()
method. However, Observable.fromPublisher()
can adapt a Publisher
into an Observable
, and from there you can set up RxJava chains as needed.
Prev Table of Contents Next
This book is licensed under the Creative Commons Attribution-ShareAlike 4.0 International license.