From RxJava to Coroutines
Mark Murphy, CommonsWare
mmurphy@commonsware.com
New Talk, Who Dis?
What We're Doing!
- Taking a trivial National Weather Service app...
- ...that is written using RxJava...
- ...and converting it to use coroutines and
Flow
How You Can Play Along!
- Download and import the starter project
-
Follow the Instructions
Step #1 Reviewing What We Have
- A Retrofit-based remote data source
- A Room-based local data source
- A repository to blend those
- A
ViewModel
- An activity
Step #2: Reviewing What We Will Change
- Replace
Completable
and Single
with coroutines
- Replace
Observable
with Flow
A Quick Coroutines Refresher
fun doSomething() {
viewModelScope.launch(Dispatchers.Main) {
println("This is executed before the delay")
doSomeNetworking()
println("This is executed after the delay")
}
println("This is executed immediately")
}
suspend fun doSomeNetworking() {
withContext(Dispatchers.IO) {
delay(2000L) // or blocking network IO
}
}
Step #3: Adding a Coroutines Dependency
implementation "androidx.room:room-ktx:$room_version"
room-ktx
for Room Kotlin extensions
- Pulls in coroutines via transitive dependency
Step #4: Converting the Remote Data Source
- Convert API's
getCurrentObservation()
to suspend
- Convert data source's
getCurrentObservation()
to suspend
- Remove
RxJava2CallAdapterFactory
- Remove all RxJava-related imports
Step #5: Alerting the Local Data Source
-
Change
save()
and clear()
- Add
suspend
- Remove return type
Step #6: Adjusting the Repository
- Add
suspend
to clear()
-
Rewrite
refresh()
- Add
suspend
- Call
convertToEntity()
directly
- Call
save()
on the observationStore()
directly
Step #7: Modifying the Motor
implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.1.0-rc01"
-
Rewrite
clear()
and refresh()
- Use
viewModelScope.launch()
- Wrap simple
repo
calls in try
/catch
RxJava Equivalents
Completable
= suspend
returning Unit
Single
= suspend
returning result
Maybe
= ¯\_(ツ)_/¯
Observable
= Flow
or Channel
Flows and Channels
-
Channel
~= hot Observable
- Emits data even if nobody is paying attention
- Usually from external source (server, GPS, etc.)
-
Flow
~= cold Observable
- Emits data only once something is observing
- Stops emitting data once loses last observer
- Built atop
Channel
A Quick Floverview
fun doSomething() {
viewModelScope.launch(Dispatchers.Main) {
randomPercentages(10, 200).collect { println(it) }
println("That's all folks!")
}
println("...and we're off!")
}
fun randomPercentages(count: Int, delayMs: Long) = flow {
for (i in 0 until count) {
delay(delayMs)
emit(Random.nextInt(1,100))
}
}
Step #8: Adding Flow
for Room
def room_version = "2.2.0-alpha02"
- Room support for
Flow
added last week!
- Upgrading Room pulls in RC of coroutines with
Flow
Step #9: Flowing from Room
- Have
load()
return Flow
instead of Observable
- Remove all RxJava-related imports
Step #10: Flowing from Our Repository
- Have
load()
in IObservationRepository
return Flow
-
Have
load()
in ObservationRepository
return Flow
- Requires
import kotlinx.coroutines.flow.map
- Remove all RxJava-related imports
Step #11: Collecting our Flow
-
Rewrite
init
to collect()
our Flow
- Simple
map()
to our view-states
- Simple
try
/catch
for our error states
- Remove
sub
and onCleared()
- Remove all RxJava-related imports
- Try running the app
Step #12: Reviewing Our Instrumented Tests
MainMotorTest
with four test functions
InstantTaskExecutorRule
to serialize Architecture Components operations
AndroidSchedulerRule
to give us control over RxJava timing
- Mockito for repository
Step #13: Repairing Our Tests
Part 1: Rules, Rules, Rules
- Add a
MainDispatcherRule
as analog for AndroidSchedulerRule
- Switch to using
MainDispatcherRule
- Delete
AndroidSchedulerRule
- Add
@ExperimentalCoroutinesApi
to MainMotorTest
Step #13: Repairing Our Tests
Part 2: Fixing initialLoad()
whenever(repo.load()).thenReturn(flowOf(listOf(TEST_MODEL)))
mainDispatcherRule.dispatcher.runCurrent()
Step #13: Repairing Our Tests
Part 3: Fixing initialLoadError()
whenever(repo.load()).thenReturn(flow { throw TEST_ERROR })
mainDispatcherRule.dispatcher.runCurrent()
Step #13: Repairing Our Tests
Part 4: Fixing refresh()
val channel = Channel<List<ObservationModel>>()
whenever(repo.load()).thenReturn(channel.consumeAsFlow())
- Add
@FlowPreview
to MainMotorTest
channel.offer(listOf(TEST_MODEL))
mainDispatcherRule.dispatcher.runCurrent()
channel.offer(listOf(TEST_MODEL, TEST_MODEL_2))
mainDispatcherRule.dispatcher.runCurrent()
Step #13: Repairing Our Tests
Part 5: Fixing clear()
val channel = Channel<List<ObservationModel>>()
whenever(repo.load()).thenReturn(channel.consumeAsFlow())
channel.offer(listOf(TEST_MODEL))
mainDispatcherRule.dispatcher.runCurrent()
channel.offer(listOf())
mainDispatcherRule.dispatcher.runCurrent()
Step #13: Repairing Our Tests
Step 6: Is It Soup Yet?
- Remove any RxJava imports
- Run the tests
- Confirm they run without failures
Step #14: Ruthlessly Removing RxJava
- Remove all RxJava-related dependencies
implementation "androidx.room:room-rxjava2:$room_version"
implementation "io.reactivex.rxjava2:rxandroid:2.1.1"
implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'
implementation "com.squareup.retrofit2:adapter-rxjava2:$retrofit_version"
implementation 'nl.littlerobots.rxlint:rxlint:1.7.4'
- Confirm tests still run
- Confirm app still runs
- Celebrate!