From RxJava to Coroutines
Mark Murphy, CommonsWare
mmurphy@commonsware.com
New Talk, Who Dis?
What We're Doing!
- Taking a trivial National Weather Service app...
- ...that is written using RxJava...
- ...and converting it to use coroutines and
Flow
How You Can Play Along!
- Download and import the starter project
-
Follow the Instructions
Step #1 Reviewing What We Have
- A Retrofit-based remote data source
- A Room-based local data source
- A repository to blend those
- A
ViewModel
- An activity
Step #2: Reviewing What We Will Change
- Replace
Completable and Single with coroutines
- Replace
Observable with Flow
A Quick Coroutines Refresher
fun doSomething() {
viewModelScope.launch(Dispatchers.Main) {
println("This is executed before the delay")
doSomeNetworking()
println("This is executed after the delay")
}
println("This is executed immediately")
}
suspend fun doSomeNetworking() {
withContext(Dispatchers.IO) {
delay(2000L) // or blocking network IO
}
}
Step #3: Adding a Coroutines Dependency
implementation "androidx.room:room-ktx:$room_version"
room-ktx for Room Kotlin extensions
- Pulls in coroutines via transitive dependency
Step #4: Converting the Remote Data Source
- Convert API's
getCurrentObservation() to suspend
- Convert data source's
getCurrentObservation() to suspend
- Remove
RxJava2CallAdapterFactory
- Remove all RxJava-related imports
Step #5: Alerting the Local Data Source
-
Change
save() and clear()
- Add
suspend
- Remove return type
Step #6: Adjusting the Repository
- Add
suspend to clear()
-
Rewrite
refresh()
- Add
suspend
- Call
convertToEntity() directly
- Call
save() on the observationStore() directly
Step #7: Modifying the Motor
implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.1.0-rc01"
-
Rewrite
clear() and refresh()
- Use
viewModelScope.launch()
- Wrap simple
repo calls in try/catch
RxJava Equivalents
Completable = suspend returning Unit
Single = suspend returning result
Maybe = ¯\_(ツ)_/¯
Observable = Flow or Channel
Flows and Channels
-
Channel ~= hot Observable
- Emits data even if nobody is paying attention
- Usually from external source (server, GPS, etc.)
-
Flow ~= cold Observable
- Emits data only once something is observing
- Stops emitting data once loses last observer
- Built atop
Channel
A Quick Floverview
fun doSomething() {
viewModelScope.launch(Dispatchers.Main) {
randomPercentages(10, 200).collect { println(it) }
println("That's all folks!")
}
println("...and we're off!")
}
fun randomPercentages(count: Int, delayMs: Long) = flow {
for (i in 0 until count) {
delay(delayMs)
emit(Random.nextInt(1,100))
}
}
Step #8: Adding Flow for Room
def room_version = "2.2.0-alpha02"
- Room support for
Flow added last week!
- Upgrading Room pulls in RC of coroutines with
Flow
Step #9: Flowing from Room
- Have
load() return Flow instead of Observable
- Remove all RxJava-related imports
Step #10: Flowing from Our Repository
- Have
load() in IObservationRepository return Flow
-
Have
load() in ObservationRepository return Flow
- Requires
import kotlinx.coroutines.flow.map
- Remove all RxJava-related imports
Step #11: Collecting our Flow
-
Rewrite
init to collect() our Flow
- Simple
map() to our view-states
- Simple
try/catch for our error states
- Remove
sub and onCleared()
- Remove all RxJava-related imports
- Try running the app
Step #12: Reviewing Our Instrumented Tests
MainMotorTest with four test functions
InstantTaskExecutorRule to serialize Architecture Components operations
AndroidSchedulerRule to give us control over RxJava timing
- Mockito for repository
Step #13: Repairing Our Tests
Part 1: Rules, Rules, Rules
- Add a
MainDispatcherRule as analog for AndroidSchedulerRule
- Switch to using
MainDispatcherRule
- Delete
AndroidSchedulerRule
- Add
@ExperimentalCoroutinesApi to MainMotorTest
Step #13: Repairing Our Tests
Part 2: Fixing initialLoad()
whenever(repo.load()).thenReturn(flowOf(listOf(TEST_MODEL)))
mainDispatcherRule.dispatcher.runCurrent()
Step #13: Repairing Our Tests
Part 3: Fixing initialLoadError()
whenever(repo.load()).thenReturn(flow { throw TEST_ERROR })
mainDispatcherRule.dispatcher.runCurrent()
Step #13: Repairing Our Tests
Part 4: Fixing refresh()
val channel = Channel<List<ObservationModel>>()
whenever(repo.load()).thenReturn(channel.consumeAsFlow())
- Add
@FlowPreview to MainMotorTest
channel.offer(listOf(TEST_MODEL))
mainDispatcherRule.dispatcher.runCurrent()
channel.offer(listOf(TEST_MODEL, TEST_MODEL_2))
mainDispatcherRule.dispatcher.runCurrent()
Step #13: Repairing Our Tests
Part 5: Fixing clear()
val channel = Channel<List<ObservationModel>>()
whenever(repo.load()).thenReturn(channel.consumeAsFlow())
channel.offer(listOf(TEST_MODEL))
mainDispatcherRule.dispatcher.runCurrent()
channel.offer(listOf())
mainDispatcherRule.dispatcher.runCurrent()
Step #13: Repairing Our Tests
Step 6: Is It Soup Yet?
- Remove any RxJava imports
- Run the tests
- Confirm they run without failures
Step #14: Ruthlessly Removing RxJava
- Remove all RxJava-related dependencies
implementation "androidx.room:room-rxjava2:$room_version"
implementation "io.reactivex.rxjava2:rxandroid:2.1.1"
implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'
implementation "com.squareup.retrofit2:adapter-rxjava2:$retrofit_version"
implementation 'nl.littlerobots.rxlint:rxlint:1.7.4'
- Confirm tests still run
- Confirm app still runs
- Celebrate!