How to test Hot Flows with Coroutines Test library 1.60


Flows are a very nice replacement for LiveData, however I could not find an updated guide on how to unit test them inside the ViewModel. This guide intends to help you on that task.

In my Grading Scale App, I adopted Flows with a lot enthusiasm after building the first version with RXJava. Flows feel much easier to work with, require less boilerplate and get the job done without having to learn as much operators as with RXJava.

Flows are the recent trend taking over LiveData as reactive containers for ViewModel – UI. You can have a look at the whole advantages of them in the very nice talk of Manuel Vivo and Jose Alcérreca from Google in the GoogleIO from 2021: Kotlin Flows in Practice:
There is also the article of Jose Alcérreca from Google that explains this concept:

class CalculatorFragmentViewModel(
  dataSource: FlowDataSource,
) : ViewModel() {
  //Example StateFlow from the dataSource
  val gradeScalesNames: StateFlow<List<String>> = dataSource.flowGradeScaleNames()
    .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000L), initialValue)

  //Example SharedFlow
  val gradesAndScale: SharedFlow<List<Pair<Grade, GradesInScale>>> =
      .shareIn(viewModelScope, SharingStarted.WhileSubscribed(5000L), 1)

With the new lifecycle extensions and function “viewLifeCycleOwner.repeatOnLifeCycle(Lifecycle.State.STARTED)” it is very easy to replace LiveData completely. You can even add an extension function to observe Flows in a LiveData similar way:

class CalculatorFragment : Fragment() {

  //Extension function to safely collect flows, in a LiveData style
  private fun <T> Flow<T>.safeCollect(function: FlowCollector<T>) {
    viewLifecycleOwner.lifecycleScope.launch {
      viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {

  //Function to collect from a Flow in ViewModel to populate a TextSpinner with the list of names in our Flow
  private fun setupGradeScaleSelectionSpinner() {
    viewModel.gradeScalesNames.safeCollect { listGSNames ->
      //... logic to input in our spinners


So I removed in my App LiveData in 3 Fragments and their ViewModels succesfully, App runs nicely, less boilerplate in ViewModel, simple and powerful operators with Flows, not restricted to Main thread, etc…

The problem I faced, was trying to unit test them, specially when using Hot Flows (Shared and StateFlow) inside the ViewModel. Those HotFlows inside the ViewModel need to have a CoroutineScope and that the best option is the viewModelScope. But how to test those when this scope cannot be injected so easily – specially when using most recent coroutine test library 1.60 version? There are many guides outside, and even on the Google Apps, some guidelines, however none with the latest version.


Add the coroutine test dependencies in your gradle module file. The new 1.6.0 version has a very useful runTest function and some nice TestCoroutineScopes, with TestCoroutineSchedules, that allow us to have a very fine control of the execution. It has however some breaking changes, so beware if you have existing tests, which use previous methods to test coroutines.

    testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.6.0'  //Current guide uses the 1.6.0 version 

To use hot flows (SharedFlow or StateFlow) inside a ViewModel we need to use the “ViewmodelScope” to launch coroutines. This give us the advantage of a “viewmodel lifecycle aware” scope, in which we safely can have those. Unfortunately the ViewModelScope uses internally as Dispatcher the “Dispatchers.Main.immediate”, which is part of the Android Framework, but that we do not wish to have access in our unit tests. So we need to add some rules to our tests that will replace everytime the use of “Dispatchers.Main” with our StandardTestDispatcher.

The new StandardTestDispatcher allows us to have a very fine control over the coroutines, as it does not runs the tasks directly but sends them to a TestCoroutineScheduler which allows us fine control with functions such as:

  • TestCoroutineScheduler.runCurrent: Runs the tasks that are scheduled to execute at this moment of virtual time
  • TestCoroutineScheduler.advanceUntilIdle: Runs the enqueued tasks in the specified order, advancing the virtual time as needed until there are no more tasks associated with the dispatchers linked to this scheduler
  • TestCoroutineScheduler.advanceTimeBy: Moves the virtual clock of this dispatcher forward by the specified amount, running the scheduled tasks in the meantime

We create a CoroutinesTestRule which will replace before each test the “Dispatchers.Main” with our StandardTestDispatcher and allow us fine control.

class CoroutinesTestRule(
  val testDispatcher : TestDispatcher = StandardTestDispatcher()
) : TestWatcher(){
  override fun starting(description: Description?) {
  override fun finished(description: Description?) {

Now, we can use this CoroutineTestRule in our Tests:

class CalculatorFragmentViewModelTest {
  val coroutineTestRule = CoroutinesTestRule()


We can now do our unit test in ViewModel as usual: in the setup part inject our mocked or fake dependencies and by adding some handy inline functions


  lateinit var fakeFlowDataSource: FlowDataSource
  lateinit var viewModel: CalculatorFragmentViewModel
  fun setUp() {
    fakeFlowDataSource = FakeFlowDataSource()
    viewModel = CalculatorFragmentViewModel(fakeFlowDataSource)


And finally run our tests using the new runTest function. This together with the CoroutinesTestRule will replace use of Dispatchers.Main in code we test, which is important when testing our HotFlows. This is very handy, to avoid having to inject in our ViewModel any kind of Dispatcher Provider.

In this example we use a handy extension function launchInTest – to make sure the flow launches the collection in our TestScope using an UnconfinedTestDispatcher to ensure that launch and async blocks at the top level of runTest are entered eagerly.
We also use a handy “mutableListOfFlow()” function, which returns a MutableList that gets the emissions of the flow.

  fun <T> Flow<T>.launchInTest(scope: TestScope) = launchIn(scope + UnconfinedTestDispatcher(scope.testScheduler))

  @OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
  fun <T> TestScope.mutableListOfFlow(flow: Flow<T>): MutableList<T>{
    val mutableListOfFlow = mutableListOf<T>()
      .onEach { mutableListOfFlow.add(it) }
    return mutableListOfFlow

Now we can go forward with our test, in this example we are going to use our handy “mutableListOfFlow” function

  fun `getGradeScalesNames StateFlow - starts with an empty list and then gets the list from our FlowDataSource`() = runTest {
    val collectedFlows = mutableListOfFlow(viewModel.gradeScalesNames)
    advanceUntilIdle() //Make sure that the viewModel Dispatcher runs until it is idle
    //We check that the first value in our StateFlow was empty
    assertThat(collectedFlows.first(), `is`(emptyList()))
    //We check that after the first value, our StateFlow gets the list of names from our FlowDataSource
    assertThat(collectedFlows[1], `is`(fo.flowGradeScaleNames().first()))
    //We need to cancel the collection of our flows, otherwise Test keeps active

By being able to collect the flow emissions into a MutableList, we can have a very detailed log of every value. This is specially useful when testing different interactions in our ViewModel that will make the flow emit new values. You dont need however to save to a mutableList. You can simply use a variable and store the latest result for your tests.

By using the advanceUntilIdle() function, we can call other functions in our ViewModel, let the Scheduler runs until it is idle, which will make all the Flows emit if they should.

NOTE: if we have Flows, that depend on the emissions of other flows, we need to make sure to collect those in our test. Here is an example:

  fun `setGradeScaleInPos sets the current GradeInScale`() = runTest() {
    //Variable where we will store the emissions
    var selectedGradeInScale: GradesInScale? = null
    var gradeScaleNames: List<String> = emptyList()
    //We need to make sure to collect the first flow, from where our tested flow is dependant
      .onEach { gradeScaleNames = it }
    //We now make sure that all emissions are done in the ViewModel, so now the first flow has a different value than the initial
    //This is the Flow we wish to test. This flow depends on the previous flow, so be aware of dependencies between flows!
      .onEach {
        selectedGradeInScale = it
    //Here we test, to get a GradeScale from the list of gradeScales 
    //Make sure all emissions are done
    //We now make a second operation, which should generate a new emission
    //Important to cancel all the flow collections

If you also need to test LiveData, you will need to add the dependency core-testing, then add the “InstantTaskExecutorRule” in your test class and then use the extension function “LiveData.asFlow()”. If you combine this function with our toMutableListOfFlow(), you get a MutableList which gets the emitted data in the LiveData

  //In your Gradle Module file
  testImplementation "androidx.arch.core:core-testing:$arch_version" 

  //at the beginning of your Test Class - to be able to observe LiveData
  val rule = InstantTaskExecutorRule()

  //You also need the CoroutinesTestRule - as LiveData also uses Dispatchers.Main and you need to use the TestDispatcher and TestScheduler
  val coroutineTestRule = CoroutinesTestRule()

  //Prepare your viewmodel, dependency injection of fakes or mocks, etc...
  fun setUp() {
    fakeFlowDataSource = FakeFlowDataSource()
    viewModel = CalculatorFragmentViewModel(fakeFlowDataSource)

  fun `test with LiveData`() = runTest {
    //liveDataValues is a mutableList which will store the emited values of the LiveData
    //Notice how we use the extensionFunction asFlow(), which converts this LiveData into a Flow
    val liveDataValues = mutableListOfFlow(viewModel.progressFlowAsLiveData.asFlow())
    //Here we make our normal calls to functions in our ViewModel, which cause LiveData to emmit
    //Make sure that the TestScheduler runs everything until idle
    //Here we can test our results 
    //Clean everything, cancel the coroutines - which automatically will also remove observers of LiveData within the asFlow() extension function

Another option is to use the asMutableList() function shown in the code below. I got inspired on the “LiveData.asFlow()” function.

fun <T> TestScope.asMutableList(liveData: LiveData<T>): MutableList<T> {
  //Here we store our emitted values - we will return this
  val mutableList = mutableListOf<T>()
  //Create a conflated Channel
  val channel = Channel<T>(Channel.CONFLATED)
  //Create a LiveValue Observer - emissions are sent to the Channel
  val observer = Observer<T> { channel.trySend(it) }
  //launches a coroutine - using our TestScope. This will be cancelled at the end of the test
  //using coroutineContext.cancelChildren
  launch(Dispatchers.Main.immediate) {
    //LiveData observer the previous created observer
    //Try block - suspends function in which each value received by the channel is added to the
    //mutable list. When Coroutines are cancelled, then the livedata removes the observer 
    //to clean up
    try {
      for (value in channel) {
    } finally {
  return mutableList

//You can then use inside your runTest{} as follows:
val liveDataValues = asMutableList(viewModel.liveData)

There are other options on how to test State or SharedFlows. For example by using the “Turbine” library as explained in this guide: I had however some difficulties to understand how to test flow emissions in StateFlow, where I expected only the last emission or value, but the “test” function forces to await for each emission manually. I find the method explained in this post a bit easier to write and understand for tests, as we can have a very fine control of emissions by collecting to MutableLists or collect only latest values into variables and can control the TestScheduler very precisely.

Please share this article if you find it useful! Thanks for reading!

, ,

Leave a Reply

Your email address will not be published.

This site uses Akismet to reduce spam. Learn how your comment data is processed.