DEV Community

Sanjay Prajapat
Sanjay Prajapat

Posted on • Edited on • Originally published at sanjayprajapat.hashnode.dev

How To Combine Flows In Kotlin

Coroutines provides combine, zip and flattenMerge operators is used to combine emissions from multiple flows

Combine

Combine operator takes the latest emission from two flows and gives result

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
   // gives 1..3 in every 300ms 
    val f1 = flowOf(1,2,3).onEach { delay(300) } 
    val f2 = flowOf("x", "y", "z").onEach { delay(400) }
    val startTime = System.currentTimeMillis() 
    f1.combine(f2) { num, str -> "$num -> $str" }
    .collect { result ->
        println("$result at ${System.currentTimeMillis() - startTime} ms from start")
    }
}
/**
1 -> x at 424 ms from start
2 -> x at 627 ms from start
2 -> y at 826 ms from start
3 -> y at 927 ms from start
3 -> z at 1226 ms from start
*/
Enter fullscreen mode Exit fullscreen mode

Zip -

Let's take an example as above. Each time emission occurs, zip operators waits for emission from other flow , when it occurs zip provide results in lambda expression as numbers and letters

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    val f1 = flowOf(1,2,3).onEach { delay(300) } 
    val f2 = flowOf("x", "y", "z").onEach { delay(400) }
    val startTime = System.currentTimeMillis() 
    f1.zip(f2) { num, str -> "$num -> $str" }
    .collect { result ->
        println("$result at ${System.currentTimeMillis() - startTime} ms from start")
    }
}
/*
1 -> x at 437 ms from start
2 -> y at 837 ms from start
3 -> z at 1239 ms from start
*/
Enter fullscreen mode Exit fullscreen mode

it will stop execution when one of the flow is completed

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit>{
    val f1 = (1..4).asFlow()
    val f2 = flowOf("Hi", "Hello", )
    f1.zip(f2){ a,b -> "$a -> $b"}
    .collect{
        println(it)
    }
}
/*
1 -> Hi
2 -> Hello
*/
Enter fullscreen mode Exit fullscreen mode

flattenMerge

It executes them as single flow ,it doesn't combines , it will not stop execution when one of the flow is completed (But zip operator does)

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    val f1 = flowOf(1,2,3,4).onEach{ delay(200)}
    val f2 = flowOf("H","O","L","A").onEach{ delay(400)}
     val startTime = System.currentTimeMillis() 
    flowOf(f1,f2).flattenMerge().collect { 
    println("$it at ${System.currentTimeMillis() - startTime} ms from start")
    }
}
/**
 * 1 at 238 ms from start
H at 438 ms from start
2 at 438 ms from start
3 at 639 ms from start
O at 838 ms from start
4 at 839 ms from start
L at 1239 ms from start
A at 1640 ms from start
 */
Enter fullscreen mode Exit fullscreen mode

Top comments (0)