From RxJava to Coroutines

Mark Murphy, CommonsWare
mmurphy@commonsware.com

New Talk, Who Dis?

New Talk, Who Dis?

What We're Doing!

  • Taking a trivial National Weather Service app...
  • ...that is written using RxJava...
  • ...and converting it to use coroutines and Flow

How You Can Play Along!

  • Download and import the starter project
  • Follow the Instructions
    • Live
    • From the PDF

Step #1 Reviewing What We Have

  • A Retrofit-based remote data source
  • A Room-based local data source
  • A repository to blend those
  • A ViewModel
  • An activity

Step #2: Reviewing What We Will Change

  • Replace Completable and Single with coroutines
  • Replace Observable with Flow

A Quick Coroutines Refresher


fun doSomething() {
  viewModelScope.launch(Dispatchers.Main) {
    println("This is executed before the delay")
    doSomeNetworking()
    println("This is executed after the delay")
  }

  println("This is executed immediately")
}

suspend fun doSomeNetworking() {
  withContext(Dispatchers.IO) {
    delay(2000L)  // or blocking network IO
  }
}

Step #3: Adding a Coroutines Dependency


implementation "androidx.room:room-ktx:$room_version"
  • room-ktx for Room Kotlin extensions
  • Pulls in coroutines via transitive dependency

Step #4: Converting the Remote Data Source

  • Convert API's getCurrentObservation() to suspend
  • Convert data source's getCurrentObservation() to suspend
  • Remove RxJava2CallAdapterFactory
  • Remove all RxJava-related imports

Step #5: Alerting the Local Data Source

  • Change save() and clear()
    • Add suspend
    • Remove return type

Step #6: Adjusting the Repository

  • Add suspend to clear()
  • Rewrite refresh()
    • Add suspend
    • Call convertToEntity() directly
    • Call save() on the observationStore() directly

Step #7: Modifying the Motor


implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.1.0-rc01"
  • Rewrite clear() and refresh()
    • Use viewModelScope.launch()
    • Wrap simple repo calls in try/catch

RxJava Equivalents

  • Completable = suspend returning Unit
  • Single = suspend returning result
  • Maybe = ¯\_(ツ)_/¯
  • Observable = Flow or Channel

Flows and Channels

  • Channel ~= hot Observable
    • Emits data even if nobody is paying attention
    • Usually from external source (server, GPS, etc.)
  • Flow ~= cold Observable
    • Emits data only once something is observing
    • Stops emitting data once loses last observer
    • Built atop Channel

A Quick Floverview


fun doSomething() {
  viewModelScope.launch(Dispatchers.Main) {
    randomPercentages(10, 200).collect { println(it) }
    println("That's all folks!")
  }

  println("...and we're off!")
}

fun randomPercentages(count: Int, delayMs: Long) = flow {
  for (i in 0 until count) {
    delay(delayMs)
    emit(Random.nextInt(1,100))
  }
}

Step #8: Adding Flow for Room


def room_version = "2.2.0-alpha02"
  • Room support for Flow added last week!
  • Upgrading Room pulls in RC of coroutines with Flow

Step #9: Flowing from Room

  • Have load() return Flow instead of Observable
  • Remove all RxJava-related imports

Step #10: Flowing from Our Repository

  • Have load() in IObservationRepository return Flow
  • Have load() in ObservationRepository return Flow
    • Requires import kotlinx.coroutines.flow.map
  • Remove all RxJava-related imports

Step #11: Collecting our Flow

  • Rewrite init to collect() our Flow
    • Simple map() to our view-states
    • Simple try/catch for our error states
  • Remove sub and onCleared()
  • Remove all RxJava-related imports
  • Try running the app

Step #12: Reviewing Our Instrumented Tests

  • MainMotorTest with four test functions
  • InstantTaskExecutorRule to serialize Architecture Components operations
  • AndroidSchedulerRule to give us control over RxJava timing
  • Mockito for repository

Step #13: Repairing Our Tests

Part 1: Rules, Rules, Rules

  • Add a MainDispatcherRule as analog for AndroidSchedulerRule
  • Switch to using MainDispatcherRule
  • Delete AndroidSchedulerRule
  • Add @ExperimentalCoroutinesApi to MainMotorTest

Step #13: Repairing Our Tests

Part 2: Fixing initialLoad()


whenever(repo.load()).thenReturn(flowOf(listOf(TEST_MODEL)))

mainDispatcherRule.dispatcher.runCurrent()

Step #13: Repairing Our Tests

Part 3: Fixing initialLoadError()


whenever(repo.load()).thenReturn(flow { throw TEST_ERROR })

mainDispatcherRule.dispatcher.runCurrent()

Step #13: Repairing Our Tests

Part 4: Fixing refresh()


val channel = Channel<List<ObservationModel>>()
whenever(repo.load()).thenReturn(channel.consumeAsFlow())
  • Add @FlowPreview to MainMotorTest

channel.offer(listOf(TEST_MODEL))
mainDispatcherRule.dispatcher.runCurrent()

channel.offer(listOf(TEST_MODEL, TEST_MODEL_2))
mainDispatcherRule.dispatcher.runCurrent()

Step #13: Repairing Our Tests

Part 5: Fixing clear()


val channel = Channel<List<ObservationModel>>()
whenever(repo.load()).thenReturn(channel.consumeAsFlow())

channel.offer(listOf(TEST_MODEL))
mainDispatcherRule.dispatcher.runCurrent()

channel.offer(listOf())
mainDispatcherRule.dispatcher.runCurrent()

Step #13: Repairing Our Tests

Step 6: Is It Soup Yet?

  • Remove any RxJava imports
  • Run the tests
  • Confirm they run without failures

Step #14: Ruthlessly Removing RxJava

  • Remove all RxJava-related dependencies

implementation "androidx.room:room-rxjava2:$room_version"
implementation "io.reactivex.rxjava2:rxandroid:2.1.1"
implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'
implementation "com.squareup.retrofit2:adapter-rxjava2:$retrofit_version"
implementation 'nl.littlerobots.rxlint:rxlint:1.7.4'
  • Confirm tests still run
  • Confirm app still runs
  • Celebrate!