Módulo 06

En este módulo trataremos:

  • Las Corrutinas en Kotlin y Android
  • Los Flows en Kotlin y en Android

Volvemos brevemente a Kotlin para entender desde el origen qué son las Coroutines y los Flow y cómo se usan nativamente en Kotlin.

Después iremos a Android de nuevo para integrar. Así entenderemos mejor el funcionamiento en Android de Corrutinas y Flows.

¿Por qué ver ahora las corutinas y los Flows?
Porque los usaremos (los flujos) para obtener los cambios que se producen en las bases de datos, así como los datos en sí que hay almacenados en ellas (los recibimos en un flujo de datos).

Corrutinas

La programación asíncrona o sin bloqueo es resuelta en el lenguaje Kotlin mediante una tecnología llamada Coroutines que viene implementada una parte en el lenguaje Kotlin propiamente dicho y otra gran parte mediante una biblioteca de funciones.

La programación asíncrona es necesaria para desarrollar algoritmos que requieren muchos recursos, consultas a servidores de internet, consultas a bases de datos, descarga de archivos grandes etc. con el objetivo de no bloquear el hilo principal de nuestra aplicación y que el usuario se vea impedido de interactuar con el programa hasta que termine de ejecutar el algoritmo.

Las corrutinas permiten no bloquear el hilo principal de la aplicación.
También son indispensables para implementar aplicaciones escalables, podemos tener programas mucho más escalables ejecutando distintas rutinas en forma simultánea en distintos procesadores.

En GitHub Corrutinas (rama starter) tenéis el código starter de un proyecto Android con un Módulo Kotlin (Corrutinas) para poder ir ejecutando lo necesario. Seleccionar la Run Configuration KotlinMain para no ejecutar en el emulador, tal como se ve en la figura.
Veréis en el código que cada uno de los siguientes apartados tiene un bloque de código que puede estar comentado. Tendréis que ir des-comentando y comentando apropiadamente para poder seguir la ejecución de todo, paso a paso.

En el módulo de Corrutinas, en el fichero build.gradle añadir o modificar la dependencia para que funcionen las corrutinas en android studio.

dependencies {
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9'
}

Ejercicio 1

En el fichero MyClass.kt cambiar la función main por la siguiente:

fun main(args: Array<String>) {
    GlobalScope.launch {
        for(x in 1..10) {
            print("$x -")
            delay(1000)
        }
    }
    println("Se bloquea el hilo principal del programa al llamar a readLine")
    readLine()
}

La opción de GlobalScope se entiende como un recurso global que no se encuentra vinculado a ningún job.
Este componente se utiliza en Kotlin con el fin de lanzar las coroutines de nivel superior, caracterizadas por su funcionamiento durante toda la vida útil de la aplicación, lo que además implica que no son canceladas de forma prematura. (veremos que no es buena práctica lanzarlas así en Android, pero aquí estamos en Kotlin y entendiendo qué son las corrutinas)

Como podemos comprobar lo primero que aparece en pantalla es el mensaje que muestra la llamada a println:

println("Se bloquea el hilo principal del programa al llamar a readLine")

Seguidamente bloqueamos el hilo principal de nuestro programa llamando a la función readLine (si el programa finaliza, todas las corrutinas que se hay iniciado finalizan en forma automática):

readLine()

Luego podemos comprobar que comienzan a aparecer en pantalla los números del 1 al 10 de uno en uno, lentamente.

La creación de una corrutina se logra llamando a la función ‘launch’ y pasando una función lambda con el algoritmo que queremos que se ejecute en forma paralela al hilo principal de nuestro programa:

GlobalScope.launch {
    for(x in 1..10) {
        print("$x -")
        delay(1000)
    }
}

Dentro de la función lambda disponemos un for que se repetirá 10 veces y en su interior mostramos el contador y detenemos la ejecución de la corrutina mediante la llamada a la función delay pasando la cantidad de milisegundos a detenerse.

Finalmente hemos puesto un readLine() para que el usuario pulse enter para terminar el hilo principal, de esta forma el hilo principal se espera mientras el usuario no pulse y así la corrutina puede evolucionar. Ya que si se termina el hilo donde se lanzó la corrutina, ésta terminará también.

Ejercicio 2

Ahora vamos a lanzar dos corrutinas: En la primera mostrar los números del 1 al 10 y en la segundo los números del 11 al 20.

    GlobalScope.launch {
        for(x in 1..10) {
            print("$x ")
            delay(1000)
        }
    }
    GlobalScope.launch {
        for(x in 11..20) {
            print("$x ")
            delay(1000)
        }
    }
    readLine()

Notar que las dos corrutinas se ejecutan en forma simultánea, no se requiere terminar la primer corrutina donde se muestran los números del 1 al 10 para que comience la segundo corrutina donde se muestran los números del 11 al 20, como vemos aparecen intercalados los resultados de cada corrutina. Normalmente cada vez que se lance el resultado será diferente, a veces una corrutina adelanta a la otra.

Ejercicio 3

Desarrollar un programa que en el hilo principal genere un número aleatorio entre 1 y 100

Luego una corrutina intentará adivinar el número cada 500ms, obtendrá un valor entre inicio=1 y fin=100, que comparara con el valor del numero generado en el hilo principal, si es mayor o menor, mostrará un mensaje y actualiza sus variables inicio y fin. Así hasta que encuentre el número.

Analizar el código solución siguiente:

import kotlin.random.Random

   fun main(args: Array<String>) {
        val numero = Random.nextInt(1, 100)
        var inicio = 1
        var fin = 100
        GlobalScope.launch {
            var valor:Int
            do {
                valor = Random.nextInt(inicio, fin)
                println(valor)
                if (valor == numero)
                    println("En número es el $valor")
                else
                    if (valor < numero) {
                        println("El numero es mayor")
                        inicio = valor
                    } else {
                        println("El numero es menor")
                        fin = valor
                    }
                delay(500)
            } while (valor != numero)
        }
        readLine() //detenemos el hilo principal del programa
    }

Una corrutina es conceptualmente similar a un hilo (thread) que se implementan en otros lenguajes o inclusive en Kotlin ya que podemos acceder a la clase Thread de Java.
Las corrutinas se pueden considerar como subprocesos livianos (son gestionados por la librería de corrutinas y comparten el hilo que se indique, pero no lo bloquean si se realizan suspensiones en ellas, por ejemplo Delays)

Si el hilo principal de nuestro programa finaliza luego todas las corrutinas en ejecución también finalizan.
Ahora veremos que mediante la llamada a la función runBlocking podemos bloquear nuestro hilo principal de la aplicación hasta que todas las corrutinas finalicen.

Ejercicio 4

Probar a ejecutar el siguiente código

fun main(args: Array<String>) = runBlocking {
    launch {
        delay(1000)
        println("Paso un segundo")
    }
    println("Iniciando")
}

Que también es lo mismo si hacemos:

fun main(args: Array<String>) {
    runBlocking {
        launch {
            delay(1000)
            println("Paso un segundo")
        }
    }
    println("Iniciando")
}

Hemos usado runBlocking que es otra forma de lanzar una corrutina, pero en este caso bloquea el hilo desde el que se lanza hasta que la corrutina termina.
De esta forma nos evitamos poner el readLine() final para que el hilo principal espere la terminación de la corrutina.

Hasta que no finalizan por completo todas las corrutinas que tienen dentro de runBlocking (una en este caso que llamamos mediante la función launch), no finaliza la función main.

launch es un constructor de corrutinas. Lanza una nueva corrutina al mismo tiempo que el resto del código, que continúa funcionando de forma independiente.

Refactorización de funciones.

Cuando queremos mover un algoritmo que contiene una corrutina a otra función, a la misma hay que agregarle el modificador ‘suspend‘.
Veamos los cambios que hay que hacer con el ejemplo anterior:

fun main(args: Array<String>) = runBlocking {
    launch {
        espera()
    }
    println("Iniciando")
}

suspend fun espera() {
    delay(1000)
    println("Pasó un segundo")
}

Las funciones de suspensión se pueden usar dentro de las corrutinas al igual que las funciones normales, pero su característica adicional es que pueden, a su vez, usar otras funciones de suspensión (como delay en este ejemplo) para suspender la ejecución de una corrutina.

La corrutinas son livianas.

A diferencia de los hilos (Thread) las corrutinas requieren muy pocos recursos para su creación y mantenimiento en su ejecución, podemos probar de crear 100000 corrutinas con el siguiente código:

fun main(args: Array<String>) = runBlocking {
    for(x in 1..100000)
        launch {
            delay(1000)
            print(".")
        }
}

Podemos observar que no hay problemas de performance en su ejecución, a pesar de hacer creado 100000 corrutinas. Si intentamos hacer lo mismo creando 100000 hilos (Thread) podremos comprobar que se genera un error en la aplicación.

Manejador que retorna launch

Launch devuelve un manejador, un objeto de tipo Job.

La función launch retorna un objeto de tipo Job que es un identificador de la corrutina iniciada y se puede usar para esperar explícitamente a que se complete.
Dicha acción se hace llamando al método join. Con Join el hilo espera a que la corrutina termine para continuar su ejecución.

fun main(args: Array<String>) = runBlocking {
    val corrutina1=launch {
        delay(1000)
        println("Pasó un segundo")
    }
    corrutina1.join()
    val corrutina2=launch {
        delay(1000)
        println("Pasó otro segundo")
    }
    corrutina2.join()
    println("Finalizado")
}

Aquí hemos secuenciado dos corrutinas.

runBlocking y coroutineScope

Cada constructor de corrutinas (runBlocking, launch,…) define un ámbito (alcance) de ejecución de la corrutina, lo que llamamos scope.

Además del alcance de la corrutina proporcionado por diferentes constructores, es decir, , es posible declarar su propio alcance utilizando el constructor de corrutinas coroutineScope.
Éste crea un alcance de corrutina y no se completa hasta que se completan todos los elementos secundarios iniciados.

Los constructores de corrutinas runBlocking y coroutineScope pueden parecer similares porque ambos esperan que su cuerpo y todos sus elementos secundarios se completen.
La principal diferencia es que el método runBlocking bloquea el hilo actual para esperar, mientras que coroutineScope simplemente suspende, liberando el hilo subyacente para otros usos. Debido a esa diferencia, runBlocking es una función regular y coroutineScope es una función de suspensión.

Se puede utilizar un constructor coroutineScope dentro de cualquier función de suspensión para realizar múltiples operaciones simultáneas.

En el main podemos tener:

    runBlocking {
        Tareas(1)
        Tareas(2)
        println("Fin de todas las tareas")
    }

Y definimos la funcion Tareas, como suspend, puesto que vamos a lanzarla desde una corrutina (creada en el main con runBlocking).
Además dentro de la función Tareas vamos a crear dos corrutinas, pero esta vez con el constructor coroutineScope

suspend fun Tareas(nro:Int) {
    coroutineScope {
        launch {
            delay(1000)
            println("Tarea $nro parte A")
        }
        launch {
            delay(2000)
            println("Tarea $nro parte B")
        }
        println("Esperando finalizar las dos tareas $nro")
    }
}

Nota: En la ejecución aparecen la Parte A antes que la B puesto que hemos forzado esto con los timings….

Es importante notar que cuando llamamos a:

Tareas(1)

La corrutina de la main se bloquea hasta que finaliza la función de suspensión ‘Tareas’, pero dentro de la función Tareas, cuando se llaman a las corrutinas con launch la función de suspensión continua y espera hasta que todas las corrutinas finalicen.

Funciones de suspensión (suspend fun)

Una función de suspensión solo puede ser llamada desde una corrutina o desde otra función de suspensión.

Las funciones de suspensión llamadas desde una corrutina se ejecutan en forma secuencial por defecto, por ejemplo probemos el siguiente código que llama a dos funciones de suspensión:

fun main(args: Array<String>) = runBlocking {
    val d1=dato1()
    println("Fin de la primera función de suspensión")
    val d2=dato2()
    println("Fin de la segundo función de suspensión")
    print(d1+d2)
}

suspend fun dato1(): Int {
    delay(3000)
    return 3
}

suspend fun dato2(): Int {
    delay(3000)
    return 3
}

En muchas situaciones las llamadas secuenciales de las funciones de suspensión son la solución correcta, por ejemplo solicitamos a un servidor un dato y a partir de dicho dato hacemos la petición a otro servidor a partir del dato recuperado del primer servidor.

El tiempo de ejecución de las dos funciones de suspensión es aproximadamente de 6 segundos, esto debido a que se ejecutan en forma secuencial.

Llamadas concurrentes.

En algunas situaciones si el problema lo permite podemos ejecutar las funciones de suspensión en forma concurrente y eventualmente si disponemos de varios procesadores la ejecución se puede hacer en paralelo con la ventaja de reducir el tiempo. Veamos la sintaxis para implementar las llamadas a funciones de suspensión en forma concurrente:

fun main(args: Array<String>) = runBlocking {
    val tiempo1 = System.currentTimeMillis()
    val corrutina1=async { dato1() }
    val corrutina2=async { dato2() }
    println(corrutina1.await()+corrutina2.await())
    val tiempo2 = System.currentTimeMillis()
    println("Tiempo total ${tiempo2-tiempo1}")
}

suspend fun dato1(): Int {
    delay(3000)
    return 3
}

suspend fun dato2(): Int {
    delay(3000)
    return 3
}

En Composing suspending functions tenemos la explicación de la secuencialidad y la concurrencia en la creación de funciones de suspension.

Conceptualmente, async es como el launch.
Inicia una corrutina separada que es un subproceso liviano que funciona simultáneamente con todas las demás corrutinas.
La diferencia es que el launch devuelve un trabajo y no tiene ningún valor resultante, mientras que async devuelve un Deferred (un diferido), un resultado futuro (sin bloqueo) que representa una promesa de proporcionar un resultado más adelante. Es decir, que async no bloquea el hilo, continúa su ejecución pero el resultado que devuelve, ya vendrá…
Puede usar .await() en un valor diferido (devolución de async) para obtener su resultado final, pero un Deferred, también es un Job, por lo que puede cancelarlo si es necesario.

Lazily started async

Opcionalmente, async se puede hacer lazy (retardado, perezoso), configurando su parámetro de inicio en CoroutineStart.LAZY.
En este modo, sólo inicia la rutina cuando await requiere su resultado o si se invoca la función de start del Job. Mira el siguiente ejemplo:

fun main(args: Array<String>) = runBlocking {
       val time = measureTimeMillis {
           val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
           val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
           // some computation
           one.start() // start the first one
           two.start() // start the second one
           println("The answer is ${one.await() + two.await()}")
       }
       println("Completed in $time ms")
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 29
}

Corrutinas Kotlin en Android

Un pequeño previo primero, como veremos los Flows son Secuencias Asíncronas, por lo que es interesante entender qué son las secuencias, ya que además pueden optimizar el tiempo de ejecución respecto a usar colecciones: Secuencias en Kotlin

Una vez visto que son las secuencias vamos con las corrutinas. Os dejo un video explicativo de resumen y lo amplio en la descripción a continuación. Al final en el apartado de referencias del apartado de corrutinas tenéis más información para ampliar y complementar lo aquí expuesto.

Una de las grandes ventajas para el programador al hacer uso de corrutinas y las funciones suspend es que la lógica o el flujo de nuestro programa se asemeja a una programación lineal, donde una linea llama a una función y obtiene un resultado, la linea siguiente usa ese resultado, pero el resultado se ha obtenido en una operación asíncrona. De esta forma no tenemos que definir funciones callback tan habituales en la programación con Android Views. Ahora con Compose y Corrutinas, aunque inicialmente es mas lío si conoces el funcionamiento anterior, todo se simplifica y finalmente es más intuitivo.

Corrutinas Kotlin en 5 minutos, aplicadas en Android Conceptos que se tratan en el video son (ampliados):

  • Tenemos varios Scopes, Application, Activity, ViewModel, … es importante diferenciar en qué ámbito definimos/lanzamos la corrutina.
    • Application Scope -> GlobalScope. La corrutina en este scope estará viva durante todo el ciclo de vida de la Application (Objeto Singleton Application)
  • Una vez definido el Scope usamos un Builder de Corrutina. Por ejemplo launch (o async).
  • Las funciones de suspensión bloquean la ejecución de la corrutina, pero no el hilo del scope en el que estamos.
  • Que la función de suspensión bloquee o no el hilo del scope depende del Dispatcher utilizado.
    Hay distintos Dispatchers (o Schedulers) asociados a las corrutinas. Cada Scope tieen un dispatcher por defecto.
    El Dispatcher.default es el por defecto para el GlobalScope.
    Este dispatcher coloca la corrutina en un hilo diferente del hilo principal.
    En Kotlin, todas las corrutinas se deben ejecutar en un despachador, incluso cuando se ejecutan en el subproceso principal.
    La corrutinas se pueden suspender a sí mismas, y el despachador es responsable de reanudarlas.
    Para especificar en qué lugar deberían ejecutarse las corrutinas, Kotlin proporciona tres despachadores que puedes utilizar:
    • Dispatcher.Main: Utiliza este despachador para ejecutar una corrutina en el subproceso de Android principal.
      Solo debes usar este despachador para interactuar con la IU y realizar trabajos rápidos.
    • Dispatcher.Default: Este despachador está optimizado para realizar trabajo que usa la CPU de manera intensiva fuera del subproceso principal. Puede usar tantos subprocesos como cores tenga la CPU. Ya que estas son tareas intensivas, no tiene sentido tener más ejecuciones al mismo tiempo, porque la CPU estará ocupada. Algunos casos prácticos de ejemplo son clasificar una lista y analizar JSON.
    • Dispatcher.IO: Este despachador está optimizado para realizar E/S de disco o red fuera del subproceso principal. ya que no usan la CPU, se puede tener muchas en ejecución al mismo tiempo. Las Apps de Android, lo que más hacen, es interactuar con el dispositivo y hacer peticiones de red, por lo que probablemente usarás este la mayoría del tiempo.
      Algunos ejemplos incluyen usar el componente Room, leer desde archivos o escribir en ellos, y ejecutar operaciones de red.
  • Lo que querremos en Android es que la corrutina se ejecute en el hilo principal, pero que las funciones de suspensión que se lancen desde la corrutina se ejecuten en hilos separados.
    Esto es así para que la corrutina tenga acceso a todos los componentes del hilo principal y pueda actualizar datos de éstos con los resultados que vienen de las funciones de suspensión.
  • Con withContext(Dispatcher){código} podemos decidir que parte de código dentro de una corrutina se ejecuta el scope asociado al dispatcher que se pasa como parámetro.
    withContext() devuelve el retorno que tenga la función o código dentro de el, bloqueando la corrutina hasta que obtiene el resultado, pero liberando el hilo principal.
    Veamos el ejemplo (usado en Application):
GlobalScope.launch(Dispatcher.Main){ //this:CoroutineScope
  val result = withContext(Dispatcher.IO){
                  DataProvider.DoHeavyTask()
               }
  println(result)
}
  • Cuando queremos realizar algo a nivel de Activity necesitamos un Scope a nivel de Activity para que cuando muera la activity, el proceso muera también.
    El objeto CoroutineScope realiza un seguimiento de cualquier corrutina que crea mediante los elementos launch o async.
    Android proporcina para la Activity el lifecycleScope y para el ViewModel el viewModelScope
    Veamos el ejemplo (usado en Activity y en ViewModel): (No hace falta indicar el Dispatcher.Main en el launch porque por defecto usan ese.
//Para usar en una clase Activity
lifecycleScope.launch{ //this:CoroutineScope
  val result = withContext(Dispatcher.IO){
                  DataProvider.DoHeavyTask()
               }
  println(result)
}

//Para usar en una clase ViewModel
viewModelScope.launch{ //this:CoroutineScope
  val result = withContext(Dispatcher.IO){
                  DataProvider.DoHeavyTask()
               }
  println(result)
}
  • Builders:
    • runBlocking: Este builder bloquea el hilo actual hasta que se terminen todas las tareas dentro de esa corrutina.
      Esto va en contra de lo que queremos lograr con las corrutinas. Entonces, ¿para qué sirve?
      Es muy útil para implementar tests sobre suspending tasks. En tus tests, envuelve la suspending task que desea probar con una llamada runBlocking y podrás asertar sobre el resultado y evitar que el test finalice antes de que finalice la tarea en segundo plano.
      O para probar como hemos hecho anteriormente en el proyecto Kotlin.
      No se usará habitualmente.
    • launch: Este es el builder más usado.
      Lo utilizarás mucho porque es la forma más sencilla de crear corrutinas.
      A diferencia de runBlocking, no bloqueará el subproceso actual (si usamos los dispatchers adecuados, claro).
      Este builder siempre necesita un scope.
      launch devuelve un Job, que es otra clase que implementa CoroutineContext.
      Los jobs tienen un par de funciones interesantes que pueden ser muy útiles, las vemos más abajo.
      Pero es importante entender que un Job puede tener a su vez otro Job padre.
      Ese job padre tiene cierto control sobre los hijos, y ahí es donde entran en juego estas funciones (Join y Cancel).
    • async: async permite ejecutar varias tareas en segundo plano en paralelo.
      No es una función de suspensión en sí misma, por lo que cuando ejecutamos async, el proceso en segundo plano se inicia, pero la siguiente línea se ejecuta de inmediato.
      async siempre debe llamarse dentro de otra corrutina y devuelve un job especializado que se llama Deferred.
      Este objeto Deferred tiene una nueva función llamada await() que es la que bloquea.
      Llamaremos a await() solo cuando necesitemos el resultado.
      Si el resultado aún no esta listo, la corrutina se suspende en ese punto.
      Si ya tenemos el resultado, simplemente lo devolverá y continuará.
      De esta manera, puedes ejecutar tantas tareas en segundo plano como necesites.

Jobs y funciones

  • El elemento Job es un controlador de corrutinas.
    Cada corrutina que creas con los objetos launch o async muestra una instancia de Job que identifica de forma única la corrutina y administra su ciclo de vida.
  • job.join(): Con está función, puedes bloquear la corrutina asociada con el job hasta que todos los jobs hijos hayan finalizado.
    Todas las funciones de suspension que se llaman dentro de una corrutina están vinculadas a job, así que el job puede detectar cuándo finalizan todos los jobs hijos y después continuar la ejecución.
    job.join() es una función de suspensión en sí misma, por lo que debe llamarse dentro de otra corrutina.
val job = GlobalScope.launch(Dispatchers.Main) {
    doCoroutineTask()
    val res1 = suspendingTask1()
    val res2 = suspendingTask2()
    process(res1, res2)
}
job.join()
  • job.cancel(): Esta función cancelará todos sus jobs hijos asociados.
    Así que, si por ejemplo mientras se está ejecutando suspendingTask1() se llama a cancel(), este no devolverá el valor a res1 y suspendingTask2() no se ejecutará nunca. Recordar que como no se ha usado async las suspendingTasks del ejemplo van en secuencial.
    job.cancel() esta es una función normal, por lo que no requiere una corrutina para ser llamada.

Algunas referencias

Flujos de Kotlin

Un flujo es un tipo de corrutina que puede emitir varios valores de manera secuencial, en lugar de las funciones de suspensión que pueden retornar un único valor.

Un flujo conceptualmente es una transmisión de datos que se puede efectuar de forma asíncrona.

Los valores emitidos deben ser del mismo tipo.
Por ejemplo, un Flow de enteros es un flujo que emite valores enteros, pero pueden ser de cualquier otro tipo: String, Float, un data class Persona etc.

Un flujo puede enviar de forma segura una solicitud de red para producir el siguiente valor (del flujo) sin bloquear el subproceso principal de la aplicación y evitar su bloqueo.

Los Flows no son más que unas colecciones lazy, es decir, hace falta una operación terminal que pida el valor del flow, obtener un resultado, la
secuencia como tal no se va a procesar.

Eso nos aporta varias ventajas:

  • La primera de ellas es que no se van a generar datos que no necesitamos
  • Y otra ventaja es que podemos crear una secuencia infinita de tal forma que se pueden hacer cosas que con colecciones normales como una lista por ejemplo no sería posible.

La mayor diferencia entre un flow y una secuencia es que el flow es asíncrono, es decir, no necesita generar los todos los resultados en el mismo momento en el que se pide. En una secuencia cuando nosotros hacemos una operación terminal todos los valores de esa secuencia se generan en ese momento, recuperamos el resultado y hacemos con él lo que queramos.

En los flows son asíncronos, es decir, las peticiones al flow pueden no ser generadas inmediatamente y llevar su tiempo obtenerlas.
De hecho un flow se puede quedar indefinidamente esperando resultados.

Los Flows corren en el contexto de una Corrutina y por tanto podemos lanzar funciones de suspensión dentro de los flow que nos generen los resultados que nosotros necesitemos.

Los Flows por defecto son Lazy o Cold Streams, es decir, hasta que alguien no se conecta para recibir resultados, estos resultados no se generan.
Un Flow no hace nada hasta que alguien empieza a escuchar los valores del mismo.

Recuperar datos de un Flow se denomina Recolectar o Collect.

Si hay otro recolector que se contecta con el mismo Flow en otro momento, va a volver a recibir toda la secuencia desde el principio, no en el punto en que el primer recolector estaba recolectando. Esto es el funcionamiento estadar y se puede configurar de otra forma.

Continuamos con el proyecto Corrutinas para añadir código para los Flows:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow

data class Persona(val nombre: String, val edad: Int)


fun retornarPersona(): Flow<Persona> = flow {
    val lista = listOf(
        Persona("diego", 53),
        Persona("juan", 33),
        Persona("ana", 33)
    )
    for (elemento in lista) {
        delay(1000)
        emit(elemento)
    }
}

fun main() {
    runBlocking {
        retornarPersona().collect(){ //it:Persona
            println("${it.nombre} ${it.edad}")
        }
    }
}

Flow es una interface y mediante la llamada a la función flow pasamos una función lambda, donde mediante la llamada de la función emit retornamos el dato que será procesado desde la llamada a la función collect.

Como collect es una función de suspensión, la misma debe llamarse desde un bloque de una corrutina, en nuestro caso dentro de runBlocking.

Podemos imaginar la recuperación de datos de personas que se encuentran en un servidor que requiere un tiempo no trivial.

Podemos hacer la llamada a la función ‘retornarPersona‘ en forma asíncrona para seguir con la ejecución del hilo principal del programa y no tener que esperar la recuperación de todos los datos:

fun main() {
    runBlocking {
        async { 
           retornarPersona().collect(){ //it:Persona
               println("${it.nombre} ${it.edad}")
           }
        }
    }
   println("Fin de main")
}

.asFlow()

Permite generar un Flow a partir de una lista.
En el siguiente ejemplo vemos como de una lista de enteros se ha generado un flow y cómo se recolecta.

//asFlow()
fun main() {
  runBlocking {
    makeFlow().collect(){
        println(it)
    }
    //Otra forma de llamar de forma que para cada elemento se ejecuta una función
    //es utilizar una referencia a la función en la llamada.
    makeFlow().collect(::println)
  }
  println("Fin de main")
}


fun makeFlow(): Flow<Int> {
    return listOf(1,2,3,4,5).asFlow()
}

Generando un Flow

Podemos crear un flow<T> genérico a partir de ieteradores o bucles como en el siguiente ejemplo:

Con la función emit(item) emitimos al flow cada elmento.

//Generando un Flow
fun main() {
  runBlocking {
    makeFlow2().collect(){
        println(it)
    }
    //Otra forma de llamar de forma que para cada elemento se ejecuta una función
    //es utilizar una referencia a la función en la llamada.
    makeFlow().collect(::println)
  }
  println("Fin de main")
}

fun makeFlow2(): Flow<Int> {
    return flow<Int>{
        for (i in 1..10){
            emit(i)
        }
    }
}

Tal como están estos dos ejemplos anteriores no aportan nada respecto a iterar por una colección.

Pero con los flows, al estar en el contexto de una corrutina podemos utilizar una función de suspensión que nos de los datos desde por ejemplo un servidor o una base de datos. El código se vería asi:

//Generando un Flow
fun main() {
    runBlocking {
        makeFlow3().collect(){
            println(it)
        }
    }

  println("Fin de main")
}


fun makeFlow3(): Flow<Int> {
    return flow<Int>{
        for (i in 1..10){
            val data = GetAsyncData()
            emit(data)
        }
    }
}

suspend fun GetAsyncData(): Int {
    return withContext(Dispatchers.IO){
        //simulamos la ejecución en el servidor
        delay(1000)
        Random.nextInt(1, 100)
    }
}

Operaciones intermedias

Podemos obtener dados del flow, pero tratarlos como hacíamos con las colecciones, aplicar filtros, procesamientos, etc.. e ir encadenando operaciones hasta finalmente tener lo esperado.

Por ejemplo aplicamos un .filter para quedarnos con los pares. (usamos GetAsyncData con un delay de 500 para hacerlo más rápido y aumentamos el numero de veces que lo llamamos a 30)
También usamos un map para transformar(usar) el dato y sacar un string.

//Operacioines intermedias
fun main() {
    runBlocking {
        makeFlow3()
            .filter { it % 2 == 0  } //Nos quedamos solo con los pares
            .map{
                "Obtenido el par: $it"
            }
            .collect(){
                println(it)
                }
    }


  println("Fin de main")
}


fun makeFlow3(): Flow<Int> {
    return flow<Int>{
        for (i in 1..10){
            val data = GetAsyncData()
            emit(data)
        }
    }
}

suspend fun GetAsyncData(): Int {
    return withContext(Dispatchers.IO){
        //simulamos la ejecución en el servidor
        delay(1000)
        Random.nextInt(1, 100)
    }
}

Con las colecciones, cada operación intermedia generaba otra colección con el resultado de la operación.

Con los flows esto no ocurre, finalmente tenemos un único Flow que es el resultante de haber aplicado todas las operaciones intermedias y es realmente a ese flow final al que nos enganchamos para recolectar con collect().
Esto hace que el procesamiento sea más rápido y liviano que con las colecciones. Realmente es lo mismo que con las secuencias, pero los flows son asíncronos.

Tipos especializados de Flows

Se han creado ciertos tipos avanzados de flows que permiten operaciones, en contextos deteerminados de la programación, más fáciles de realizar que utilizando el flow básico.

Estos son :

  • StateFlow
  • SharedFlow
  • Channel
  • CallbackFlow

SateFlow

A diferencia de un flow normal, que denominábamos Cold Flow, un StateFlow es un flujo que está continuamente generando valores, aunque nadie los lea, por eso se les denomina Hot Flow.

Cuando pedimos un valor al StateFlow, no nos da todos los valors, sino únicamente el último valor, el estado actual del flujo.

Nos servirán para resolver la suscripción a un estado de nuestra aplicación. Nos interesa conocer sólo el estado actual, no los valores anteriores por los que ha pasado el estado.

En el momento de la suscripción al StateFlow se nos entrega el último valor del estado. Veremos más adelante que es un caso especial de SharedFlow.

En el proyecto Corrutinas creamos una clase que nos simula el estado, la llamamos ViewState simulando el estado de una vista.

Creamos una variable privada _state que la asignamos a un StateFlow o a un MutableStateFlow. La diferencia es que los valores del MutableStateFlow podemos cambiarlos. Los StateFlows (mutables o no) deben estar siempre inicializados. En este caso lo inicializamos a 1.

Creamos la variable privada para que nuestra clase (las funciones de la misma) la puedan modificar, pero ofertamos una variable pública inmutable para que los recolectores no puedan cambiarla. Convertimos la mutable en inmutable con .asStateFlow() o bien haciendo un getter con val (inmutable)

Además vemos que como un StateFlow únicamente tiene un valor (value) no necesitamos hacer emit(), simplemente modificando la propiedad .value actualziamos el único valor, el último.

class ViewState(){
    private val _state = MutableStateFlow(1)
    val state
        get() = _state

    suspend fun startUpdating(){
        while(true){
            delay(2000)
            _state.value = _state.value +1
        }
    }
}

Ahora en nuestro main, en el runBlocking vamos a ponernos a escuchar ese estado.

Lo que hacemos es crear una variable de nuestra clase ViewState.
Le decimos a nuestra instancia que comience a emitir valores llamando a su método startUpdating(). Pero esto tenemos que hacerlo en una corrutina, ya que la función startUpdating es un bucle sin fin y por tanto si no lo ponemos en una corrutina, nunca podríamos avanzar a la siguiente línea (la de collect)
En esta siguiente línea, accedemos al estádo público (que es un StateFlow) y nos ponemos a recolectar con collect(::print) (imprimimos el it)

Podemos poner un delay para que no empiece a recolectar pasados unos segundos, con lo que veremos que el estado ha ido evolucionando (cada 2 segundos) a nuevos valores aunque no los hayamos recolectado.

Nuestro bloque principal quedaría asi:

//StateFlow
    runBlocking {
        val viewState = ViewState()
        launch{
            viewState.startUpdating()
        }
        delay(5000) //Comenzamos a recolectar pasados 5 segundos
        viewState.state.collect(::println)
    }

Shared Flows

Domina los Flows en Kotlin (Shared Flows)

Channels

Domina los Flows en Kotlin (Channels)

Callback Flows

Domina los Flows en Kotlin (Callback Flows)

Flows en Kotlin aplicados a Android

Video   Kotlin Flows in practice

Arquitectura basada en Flows de datos desde diversas fuentes, gestionadas por el Repositorio

Recursos y referencias: