Módulo 06

En este módulo trataremos:

  • Las Corrutinas en Kotlin y Android
  • Los Flows en Kotlin y en Android

Volvemos brevemente a Kotlin para entender desde el origen qué son las Coroutines y los Flow y cómo se usan nativamente en Kotlin.

Después iremos a Android de nuevo para integrar. Así entenderemos mejor el funcionamiento en Android de Corrutinas y Flows.

¿Por qué ver ahora las corutinas y los Flows?
Porque los usaremos (los flujos) para obtener los cambios que se producen en las bases de datos, así como los datos en sí que hay almacenados en ellas (los recibimos en un flujo de datos).

Corrutinas

La programación asíncrona o sin bloqueo es resuelta en el lenguaje Kotlin mediante una tecnología llamada Coroutines que viene implementada una parte en el lenguaje Kotlin propiamente dicho y otra gran parte mediante una biblioteca de funciones.

La programación asíncrona es necesaria para desarrollar algoritmos que requieren muchos recursos, consultas a servidores de internet, consultas a bases de datos, descarga de archivos grandes etc. con el objetivo de no bloquear el hilo principal de nuestra aplicación y que el usuario se vea impedido de interactuar con el programa hasta que termine de ejecutar el algoritmo.

Las corrutinas permiten no bloquear el hilo principal de la aplicación.
También son indispensables para implementar aplicaciones escalables, podemos tener programas mucho más escalables ejecutando distintas rutinas en forma simultánea en distintos procesadores.

En GitHub Corrutinas (rama starter) tenéis el código starter de un proyecto Android con un Módulo Kotlin (Corrutinas) para poder ir ejecutando lo necesario. Seleccionar la Run Configuration KotlinMain para no ejecutar en el emulador, tal como se ve en la figura.
Veréis en el código que cada uno de los siguientes apartados tiene un bloque de código que puede estar comentado. Tendréis que ir des-comentando y comentando apropiadamente para poder seguir la ejecución de todo, paso a paso.

En el módulo de Corrutinas, en el fichero build.gradle añadir o modificar la dependencia para que funcionen las corrutinas en android studio.

dependencies {
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9'
}

Ejercicio 1

En el fichero MyClass.kt cambiar la función main por la siguiente:

fun main(args: Array<String>) {
 //EJERCICIO 1
    log("Inicio del programa");
    GlobalScope.launch {
        log("Inicio de la corrutina")
        for(x in 1..10) {
            print("$x -")
            delay(1000)
        }
    }
    log("Se bloquea el hilo principal del programa al llamar a readLine")
    readLine()

}

fun log(message: String) {
    println("[${Thread.currentThread().name}] : $message")
}

fun log(character: Char) {
    print("$character")
}

La opción de GlobalScope se entiende como un recurso global que no se encuentra vinculado a ningún job y genera un nuevo hilo de ejecución global.
Este componente se utiliza en Kotlin con el fin de lanzar las coroutines de nivel superior, caracterizadas por su funcionamiento durante toda la vida útil de la aplicación, lo que además implica que no son canceladas de forma prematura. (veremos que no es buena práctica lanzarlas así en Android, pero aquí estamos en Kotlin y entendiendo qué son las corrutinas)

Como podemos comprobar lo primero que aparece en pantalla es el mensaje que muestra las llamadas a log:

log("Se bloquea el hilo principal del programa al llamar a readLine")

Seguidamente bloqueamos el hilo principal de nuestro programa llamando a la función readLine (si el programa finaliza, todas las corrutinas que se hay iniciado finalizan en forma automática):

readLine()

Luego podemos comprobar que comienzan a aparecer en pantalla los números del 1 al 10 de uno en uno, lentamente.

La creación de una corrutina se logra llamando a la función ‘launch’ y pasando una función lambda con el algoritmo que queremos que se ejecute en forma paralela al hilo principal de nuestro programa:

GlobalScope.launch {
    for(x in 1..10) {
        print("$x -")
        delay(1000)
    }
}

Dentro de la función lambda disponemos un for que se repetirá 10 veces y en su interior mostramos el contador y detenemos la ejecución de la corrutina mediante la llamada a la función delay pasando la cantidad de milisegundos a detenerse.

Finalmente hemos puesto un readLine() para que el usuario pulse enter para terminar el hilo principal, de esta forma el hilo principal se espera mientras el usuario no pulse y así la corrutina puede evolucionar. Ya que si se termina el hilo donde se lanzó la corrutina, ésta terminará también. Es decir, si mientras se está mostrando la cuenta el usuario pulsa enter, como el hilo principal estaba esperando la entrada de usuario, la lee y termina, terminando por tanto la corrutina.

Ejercicio 2

Ahora vamos a lanzar dos corrutinas: En la primera mostrar los números del 1 al 10 y en la segunda los números del 11 al 20.

 // EJERCICIO 2
    GlobalScope.launch {
        log("Inicio de la corrutina 1")
        for(x in 1..10) {
            print("$x ")
            delay(1000)  //sleep(1000)
        }
    }
    GlobalScope.launch {
        log("Inicio de la corrutina 2")
        for(x in 11..20) {
            print("$x ")
            sleep(1000) //delay(1000)
        }
    }
    readLine()

Notar que las dos corrutinas se ejecutan en forma simultánea, no se requiere terminar la primer corrutina donde se muestran los números del 1 al 10 para que comience la segundo corrutina donde se muestran los números del 11 al 20, como vemos aparecen intercalados los resultados de cada corrutina. Normalmente cada vez que se lance el resultado será diferente, a veces una corrutina adelanta a la otra. Cada corrutina se lanza en un thread diferente de nivel global.

En este caso podemos utilizar sleep o delay. Sleep es una función externa del sistema, realmente es Thread.sleep() que para el hilo de ejecución. Pero delay() es una función de suspensión, es una función que suspende (para) la ejecución de una corrutina, no del thread en la que corre la corrutina. Como en este ejemplo tenemos dos corrutinas, pero que se han lanzado con Globalscope, están ejecutando en threads distintos, y por eso en este caso el resultado de ejecución es indistinto al usar delay o sleep.

Ejercicio 3

Desarrollar un programa que en el hilo principal genere un número aleatorio entre 1 y 100

Luego una corrutina intentará adivinar el número cada 500ms, obtendrá un valor entre inicio=1 y fin=100, que comparara con el valor del numero generado en el hilo principal, si es mayor o menor, mostrará un mensaje y actualiza sus variables inicio y fin. Así hasta que encuentre el número.

Analizar el código solución siguiente:

import kotlin.random.Random

   fun main(args: Array<String>) {
 //EJERCICIO 3
    log("Ejecución en el hilo principal. Adivina el número entre 1 y 100 ")
    val numero = Random.nextInt(1, 100)
    var inicio = 1
    var fin = 100
    GlobalScope.launch {
        var valor:Int
        log("Inicio de la corrutina adivinadora")
        do {
            valor = Random.nextInt(inicio, fin)
            println(valor)
            if (valor == numero)
                println("En número es el $valor")
            else
                if (valor < numero) {
                    println("El numero es mayor")
                    inicio = valor
                } else {
                    println("El numero es menor")
                    fin = valor
                }
            delay(500)
        } while (valor != numero)
    }
    readLine() //detenemos el hilo principal del programa
    }

Una corrutina es conceptualmente similar a un hilo (thread) que se implementan en otros lenguajes o inclusive en Kotlin ya que podemos acceder a la clase Thread de Java.
Las corrutinas se pueden considerar como subprocesos livianos (son gestionados por la librería de corrutinas y comparten el hilo que se indique, pero no lo bloquean si se realizan suspensiones en ellas, por ejemplo Delays.

Si el hilo principal de nuestro programa finaliza luego todas las corrutinas en ejecución también finalizan.
Ahora veremos que mediante la llamada a la función runBlocking podemos bloquear nuestro hilo principal de la aplicación hasta que todas las corrutinas finalicen.

Ejercicio 4

Probar a ejecutar el siguiente código

fun main(args: Array<String>) = runBlocking {
    //Para probar esto hay que añadir " = runBlocking" en la función main
    log("Running in the main thread")
    launch {
        delay(1000)
        log("Paso un segundo")
        }
    log("After runBlocking")
    println("Fin del Main")
}

La salida es:

[main] : Running in the main thread
[main] : After runBlocking
Fin del main
[main] : Paso un segundo

Process finished with exit code 0

Que también es lo mismo si hacemos:

fun main(args: Array<String>) {
    //Con RunBlocking
    //Prueba a ejecutar en el scope prinpical. Quitar el runBlocking de la función main
    log("Running in the main thread")
    runBlocking {
        launch {
            delay(1000)
            log("Paso un segundo")
        }
    }
    log("After runBlocking")
    println("Fin del Main")
}

Hemos usado runBlocking que es otra forma de lanzar una corrutina, pero en este caso bloquea el hilo desde el que se lanza hasta que la corrutina termina. Aquí, no se crea un nuevo hilo para la corrutina, sino que se ejecuta en el hilo en el que se llama a runBlocking, en este caso [Main]. La salida es:

[main] : Running in the main thread
[main] : Paso un segundo
[main] : After runBlocking
Fin del main

Process finished with exit code 0

De esta forma nos evitamos poner el readLine() final para que el hilo principal espere la terminación de la corrutina.

Si nos fijamos en la ejecución:
En el primer código, runBlocking se utiliza como un bloque de construcción de la función main. Esto significa que la función main se ejecutará en el contexto de la corrutina proporcionada por runBlocking. En este caso, runBlocking no bloquea el hilo principal, sino que simplemente proporciona un contexto de corrutina para la función main. Por lo tanto, cuando se lanza la corrutina con launch, se ejecuta de forma asíncrona con respecto al hilo principal, lo que permite que el hilo principal continúe ejecutándose. Esto es lo que permite que “Fin del Main” se imprima antes de “Paso un segundo”.
En el segundo código, runBlocking se utiliza dentro de la función main. En este caso, runBlocking bloquea el hilo principal hasta que se completa la corrutina que se lanza dentro de él. Por lo tanto, “Paso un segundo” se imprime antes de que el hilo principal pueda continuar y imprimir “Fin del Main”.

runBlocking se utiliza para bloquear el hilo actual hasta que se complete la corrutina que se lanza dentro de él. Si se utiliza como bloque de construcción de la función main, proporciona un contexto de corrutina para la función main pero no bloquea el hilo principal.

Hasta que no finalizan por completo todas las corrutinas que tienen dentro de runBlocking (una en este caso que llamamos mediante la función launch), no finaliza la función main.

launch es un constructor de corrutinas. Lanza una nueva corrutina al mismo tiempo que el resto del código, que continúa funcionando de forma independiente.

Refactorización de funciones.

Cuando queremos pasar un algoritmo contenido en una corrutina a una función y llamarla desde la corrutina, en vez de poner el código del algoritmo dentro de la corrutina, entonces, a la función donde ponemos el código hay que agregarle el modificador ‘suspend‘.
Veamos los cambios que hay que hacer con el ejemplo anterior:

fun main(args: Array<String>) {
 //Refactorización de funciones.
    log("Running in the main thread")
    runBlocking {
        launch {
            log("En el hilo de la corrutina")
            espera() //Esta debera ser una función suspendida por estar dentro de una corrutina
        }
    }
    log("After runBlocking")

}
suspend fun espera() {
    delay(1000) //esto tambien es una función de suspension
    println("Pasó un segundo")
}

Las funciones de suspensión se pueden usar dentro de las corrutinas al igual que las funciones normales, pero su característica adicional es que pueden, a su vez, usar otras funciones de suspensión (como delay en este ejemplo) para suspender la ejecución de una corrutina.

La corrutinas son livianas.

A diferencia de los hilos (Thread) las corrutinas requieren muy pocos recursos para su creación y mantenimiento en su ejecución, podemos probar de crear 100000 corrutinas con el siguiente código:

fun main(args: Array<String>) {
 //La corrutinas son livianas.
    log("Running in the main thread")
    runBlocking {
        for (x in 1..100000)
            launch {
                delay(1000)
                print(".")
            }
    }
    log("After runBlocking")

}

Podemos observar que no hay problemas de rendimiento en su ejecución, a pesar de hacer creado 100000 corrutinas. Si intentamos hacer lo mismo creando 100000 hilos (Thread) podremos comprobar que se genera un error en la aplicación.

Manejador que retorna launch

Launch devuelve un manejador, un objeto de tipo Job.

La función launch retorna un objeto de tipo Job que es un identificador de la corrutina iniciada y se puede usar para esperar explícitamente a que se complete.
Dicha acción se hace llamando al método join. Con Join el hilo espera a que la corrutina termine para continuar su ejecución.

////Manejador que retorna launch
    log("Running in the main thread")
    runBlocking {
        val corrutina1=launch {
            log("En la corrutina 1")
            delay(1000)
            log("Pasó un segundo")
        }
        //corrutina1.join()
        val corrutina2=launch {
            log("En la corrutina 2")
            delay(1000)
            log("Pasó otro segundo")
        }
        //corrutina2.join()
    }
    log("After runBlocking")

Aquí hemos podemos secuenciar dos corrutinas si utilizamos .join() con el majenador devuelto por launch.

Si comentamos las lineas con .join() veremos que comienzan a la vez y terminan también a la vez (porque los delays son el mismo)

runBlocking y coroutineScope

Cada constructor de corrutinas (runBlocking, launch,…) define un ámbito (alcance o scope) de ejecución de la corrutina.

Es posible declarar su propio alcance utilizando el constructor de corrutinas coroutineScope. Éste crea un alcance de corrutina y no se completa hasta que se completan todos los elementos secundarios iniciados.

Los constructores de corrutinas runBlocking y coroutineScope pueden parecer similares porque ambos esperan que su cuerpo y todos sus elementos secundarios se completen.
La principal diferencia es que el método runBlocking bloquea el hilo actual para esperar, mientras que coroutineScope simplemente suspende, liberando el hilo subyacente para otros usos. Debido a esa diferencia, runBlocking es una función regular y coroutineScope es una función de suspensión.

Se puede utilizar un constructor coroutineScope dentro de cualquier función de suspensión para realizar múltiples operaciones simultáneas.

En el main podemos tener:

 //runBlocking y coroutineScope
    runBlocking {
        Tareas(1)
        Tareas(2)
        log("Fin de todas las tareas")
    }

Y definimos la función Tareas, como suspend, puesto que vamos a lanzarla desde una corrutina (creada en el main con runBlocking).
Además dentro de la función Tareas vamos a crear dos corrutinas, pero esta vez con el constructor coroutineScope

suspend fun Tareas(nro:Int) {
    coroutineScope {
        launch {
            log("Tarea $nro parte A. iniciando...")
            delay(1000)
            log("Tarea $nro parte A. finalizada")
        }
        launch {
            log("Tarea $nro parte B. iniciando...")
            delay(2000)
            log("Tarea $nro parte B. finalizada")
        }
        log("Esperando finalizar las dos partes de las tareas $nro")
    }
}

Nota: En la ejecución aparecen la Parte A antes que la B puesto que hemos forzado esto con los timings….

Es importante notar que cuando llamamos a:

Tareas(1)

La corrutina de la main se bloquea hasta que finaliza la función de suspensión ‘Tareas’, pero dentro de la función Tareas, cuando se llaman a las corrutinas con launch la función de suspensión continua y espera hasta que todas las corrutinas finalicen.

Funciones de suspensión (suspend fun)

Una función de suspensión solo puede ser llamada desde una corrutina o desde otra función de suspensión.

Las funciones de suspensión llamadas desde una corrutina se ejecutan en forma secuencial por defecto, por ejemplo probemos el siguiente código que llama a dos funciones de suspensión:

fun main(args: Array<String>) = runBlocking {
    val d1=dato1()
    log("Fin de la primera función de suspensión")
    val d2=dato2()
    log("Fin de la segundo función de suspensión")
    print(d1+d2)
}

suspend fun dato1(): Int {
    delay(3000)
    return 3
}

suspend fun dato2(): Int {
    delay(3000)
    return 3
}

En muchas situaciones las llamadas secuenciales de las funciones de suspensión son la solución correcta, por ejemplo solicitamos a un servidor un dato y a partir de dicho dato hacemos la petición a otro servidor a partir del dato recuperado del primer servidor.

El tiempo de ejecución de las dos funciones de suspensión es aproximadamente de 6 segundos, esto debido a que se ejecutan en forma secuencial.

Llamadas concurrentes.

En algunas situaciones si el problema lo permite podemos ejecutar las funciones de suspensión en forma concurrente y eventualmente si disponemos de varios procesadores la ejecución se puede hacer en paralelo con la ventaja de reducir el tiempo. Veamos la sintaxis para implementar las llamadas a funciones de suspensión en forma concurrente:

fun main(args: Array<String>) = runBlocking {
// LLamadas concurrentes
    runBlocking {
        val tiempo1 = System.currentTimeMillis()
        val corrutina1=async {
            log("Iniciando la corrutina 1")
            dato1()
        }
        val corrutina2=async {
            log("Iniciando la corrutina 2")
            dato2()
        }
        println(corrutina1.await()+corrutina2.await())
        val tiempo2 = System.currentTimeMillis()
        println("Tiempo total ${tiempo2-tiempo1} ms")
    }

}

suspend fun dato1(): Int {
    delay(3000)
    return 3
}

suspend fun dato2(): Int {
    delay(3000)
    return 3
}

En Composing suspending functions tenemos la explicación de la secuencialidad y la concurrencia en la creación de funciones de suspension.

Conceptualmente, async es como el launch.
Inicia una corrutina separada que es un subproceso liviano que funciona simultáneamente con todas las demás corrutinas.
La diferencia es que el launch devuelve un trabajo y no tiene ningún valor resultante, mientras que async devuelve un Deferred (un diferido), un resultado futuro (sin bloqueo) que representa una promesa de proporcionar un resultado más adelante. Es decir, que async no bloquea el hilo, continúa su ejecución pero el resultado que devuelve, ya vendrá…
Puede usar .await() en un valor diferido (devolución de async) para obtener su resultado final, pero un Deferred, también es un Job, por lo que puede cancelarlo si es necesario.

Lazily started async

Opcionalmente, async se puede hacer lazy (retardado, perezoso), configurando su parámetro de inicio en CoroutineStart.LAZY.
En este modo, sólo inicia la rutina cuando await requiere su resultado o si se invoca la función de start del Job. Mira el siguiente ejemplo:

fun main(args: Array<String>) = runBlocking {
// Lazily started async
       val time = measureTimeMillis {
           val one = async(start = CoroutineStart.LAZY) {
               log("Iniciando la corrutina 1")
               doSomethingUsefulOne()
           }
           val two = async(start = CoroutineStart.LAZY) {
               log("Iniciando la corrutina 2")
               doSomethingUsefulTwo()
           }
           // some computation
           one.start() // start the first one
           two.start() // start the second one
           println("The answer is ${one.await() + two.await()}.")
       }
       println("Completed in $time ms")
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 29
}

Secuencias en Kotlin.

Un pequeño previo primero antes de ver las corrutinas y los Flows.

Como veremos los Flows son Secuencias Asíncronas. Por tanto es interesante entender qué son las secuencias, ya que además pueden optimizar el tiempo de ejecución respecto a usar colecciones: Secuencias en Kotlin

  • Las secuencias son colecciones.
  • El calculo de los elementos de la coleccion se hace cuando se necesitan o se accede a la secuencia. Ventajas:
    • Las secuencias pueden ser infitas, se pueden basar en una semilla y una operación sobre ella, por lo que los valores pueden ser infinitos.
    • Nos permiten evitar pasos intermedios en las operaciones con colecciones, al trabajar con una secuencia no se generan las colecciones itermedias de las operaciones aplicadas.

Crear secuencias

Podemos crear secuencias con sequenceOf() o bien usar .asSequence() si partimos ya de una colección.

//  //CREAR SECUENCIAS
    //Crear una secuencia con SequenceOf
    val sequeceOfInt = sequenceOf(1, 2, 3, 4, 5);

    //Crear una secuencia a partir de otra colección
    val strNumbers = listOf("uno",  "dos", "tres", "cuatro", "cinco")
    val sequence = strNumbers.asSequence()

    //Filtramos,sin utilizar asSequence, los elementos de strNumbers que tienen más de 3 letras
    val strNumbersSize= strNumbers
        .filter { it.length > 3 } //esto crea una nueva lista
        .map{it.length} //esto crea otra nueva lista

    //Filtramos, utilizando asSequence, los elementos de strNumbers que tienen más de 3 letras
    val strNumbersSize2= strNumbers
        .asSequence() //esto crea una secuencia
        .filter { it.length > 3 } //esto no crea una nueva lista, sino que opera sobre la secuencia
        .map{it.length} //esto no crea una nueva lista, sino que opera sobre la secuencia
        .toList() //esto convierte la secuencia final en una lista.
    //Con esta forma hemos evitado crear dos listas intermedias

También podemos crear secuencias con la funcion geneerateSequence() que le pasamos una semilla y una función.

////  //CREAR SECUENCIAS CON generateSequence
    val oddNumbers = generateSequence(1) { it + 2 } //Empieza en 1 y va sumando 2
    //esto me genera una secuencia infinita de números impares

    //pero la usuaré para extraer de ella los que me interesen.
    //extraerermos los que no sean modulo de 3 y que al convertirlo a string su longitud sea menor o igual que 3
    //pero solo los numeros del 1 al 99 primeros
    val oddNumbers2 = generateSequence(1) { it + 2 }
        .filter { it % 3 != 0 }
        .map { it.toString() }
        .filter { it.length <= 3 }
        .take(99)
        .toList()

Creación de secuencias utilizando sequence y Yield(). Esta forma sirve mucho para entender los flows.Yield es una función de suspensión que proporciona un valor o una serie de valores y suspende hasta que le pidan el siguiente valor.

////  //CREAR SECUENCIAS CON sequence y yield
    val seq = sequence {
           yield(3)
    }
    println(seq.toList())

    //Ahora vamos a hacer una secuencia que genere los números del 1 al 5
    val seq2 = sequence {
        for (i in 1..5) {
            yield(i)
        }
    }
    println(seq2.toList())

    //Aqui generamos una secuencia que genere los números del 1 al 5 y luego los del 6 al 10
    //utilizando yieldAll que lo que hace es añadir una lista de elementos a la secuencia
    val seq3 = sequence {
        for (i in 1..5) {
            yield(i)
        }
        yieldAll(listOf(6, 7, 8, 9, 10))
    }
    println(seq3.toList())

    //Aqui generamos una secuencia que genere los números del 1 al 5 y luego los del 6 al 10
    //utilizando yieldAll que lo que hace es añadir una lista de elementos a la secuencia
    val seq4 = sequence {
        var last:Int=0
        for (i in 1..5) {
            yield(i)
            last=i;
        }
        //Añadimos con yieldAll la secuencia de numeros impares a partir del último valor de la anterior
        yieldAll(generateSequence(last+2) { it + 2 })
    }
    println(seq4.take(10).toList())

Tipos de operaciones con secuencias

Las operaciones con secuencias se pueden clasificar en función de si necesitan estado o no y también se pueden clasificar en función de si son operaciones intermedias o de finalización.

En cuando al estado: Operaciones Stateless y operaciones Statefull

  • Stateless: no necesitamos estado (o un estado muy pequeño) para generar la secuencia. Ejemplos: map, take, filter, drop
  • Statefull: necesitamos un estado grande para poder generar la secuencia. Esto lleva un coste como si se tratara de generar una colección, puesto que para calcular una secuencia necesita recorrer todos los elementos de la secuencia anterior para determinar la nueva. Ejemplos: Sorted, Distinct, Chunk
  • Intermediate: Genera otra secuencia
  • Terminal: Lanza la generación de todos los valores de la secuencia para convertirla en una colección o un valor final. Ejemplos: toList(), sum(),

Veremos que los Flows no son más que Secuencias Asíncronas.

Corrutinas Kotlin en Android

Una vez visto que son las secuencias vamos con las corrutinas.

Os dejo un vídeo explicativo de resumen y lo amplio en la descripción a continuación. Al final en el apartado de referencias del apartado de corrutinas tenéis más información para ampliar y complementar lo aquí expuesto.

Una de las grandes ventajas para el programador al hacer uso de corrutinas y las funciones suspend es que la lógica o el flujo de nuestro programa se asemeja a una programación lineal, donde una linea llama a una función y obtiene un resultado, la linea siguiente usa ese resultado, pero el resultado se ha obtenido en una operación asíncrona. De esta forma no tenemos que definir funciones callback tan habituales en la programación con Android Views. Ahora con Compose y Corrutinas, aunque inicialmente es mas lío si conoces el funcionamiento anterior, todo se simplifica y finalmente es más intuitivo.

Corrutinas Kotlin en 5 minutos, aplicadas en Android Conceptos que se tratan en el vídeo son (ampliados):

  • Tenemos varios Scopes, Application, Activity, ViewModel, … es importante diferenciar en qué ámbito definimos/lanzamos la corrutina.
    • Application Scope -> GlobalScope. La corrutina en este scope estará viva durante todo el ciclo de vida de la Application (Objeto Singleton Application)
  • Una vez definido el Scope usamos un Builder de Corrutina. Por ejemplo launch (o async).
  • Las funciones de suspensión que se usan dentro de una corrutina, bloquean la ejecución de la corrutina, pero no el hilo del scope en el que estamos.
  • Que la función de suspensión bloquee o no el hilo del scope depende del Dispatcher utilizado.
    Hay distintos Dispatchers (o Schedulers) asociados a las corrutinas.
  • Cada Scope tiene un dispatcher por defecto.
    El Dispatcher.default es el por defecto para el GlobalScope.
    Este dispatcher coloca la corrutina en un hilo diferente del hilo principal.
    En Kotlin, todas las corrutinas se deben ejecutar en un despachador, incluso cuando se ejecutan en el subproceso principal.
    La corrutinas se pueden suspender a sí mismas, y el despachador es responsable de reanudarlas.
    Para especificar en qué lugar deberían ejecutarse las corrutinas, Kotlin proporciona tres despachadores que puedes utilizar:
    • Dispatcher.Main: Utiliza este despachador para ejecutar una corrutina en el Main Thread de Android.
      Solo debes usar este despachador para interactuar con la IU y realizar trabajos rápidos.
    • Dispatcher.Default: Este despachador está optimizado para realizar trabajo que usa la CPU de manera intensiva fuera del Main Thread. Puede usar tantos subprocesos como cores tenga la CPU. Ya que estas son tareas intensivas, no tiene sentido tener más ejecuciones al mismo tiempo, porque la CPU estará ocupada. Algunos casos prácticos de ejemplo son clasificar una lista y analizar JSON.
    • Dispatcher.IO: Este despachador está optimizado para realizar E/S de disco o red fuera del subproceso principal. ya que no usan la CPU, se puede tener muchas en ejecución al mismo tiempo. Las Apps de Android, lo que más hacen, es interactuar con el dispositivo y hacer peticiones de red, por lo que probablemente usarás este la mayoría del tiempo. Algunos ejemplos incluyen usar el componente Room, leer desde archivos o escribir en ellos, y ejecutar operaciones de red.
  • Lo que querremos en Android es que la corrutina se ejecute en el hilo principal, pero que las funciones de suspensión que se lancen desde la corrutina se ejecuten en hilos separados.
    Esto es así para que la corrutina tenga acceso a todos los componentes del hilo principal y pueda actualizar datos de éstos con los resultados que vienen de las funciones de suspensión.
  • Con withContext(Dispatcher){código} podemos decidir que parte de código dentro de una corrutina se ejecuta en el scope asociado al dispatcher que se pasa como parámetro.
    withContext() devuelve el retorno que tenga la función o código dentro de el, bloqueando la corrutina hasta que obtiene el resultado, pero liberando el hilo principal.
    Veamos el ejemplo (usado en Application):
GlobalScope.launch(Dispatcher.Main){ //this:CoroutineScope
  val result = withContext(Dispatcher.IO){
                  DataProvider.DoHeavyTask()
               }
  println(result)
}
  • Cuando queremos realizar algo a nivel de Activity necesitamos un Scope a nivel de Activity para que cuando muera la activity, el proceso muera también.
    El objeto CoroutineScope realiza un seguimiento de cualquier corrutina que crea mediante los elementos launch o async.
    Android proporcina para la Activity el lifecycleScope y para el ViewModel el viewModelScope
    Veamos el ejemplo (usado en Activity y en ViewModel): (No hace falta indicar el Dispatcher.Main en el launch porque por defecto usan ese.
//Para usar en una clase Activity
lifecycleScope.launch{ //this:CoroutineScope
  val result = withContext(Dispatcher.IO){
                  DataProvider.DoHeavyTask()
               }
  println(result)
}

//Para usar en una clase ViewModel
viewModelScope.launch{ //this:CoroutineScope
  val result = withContext(Dispatcher.IO){
                  DataProvider.DoHeavyTask()
               }
  println(result)
}
  • Builders:
    • runBlocking: Este builder bloquea el hilo actual hasta que se terminen todas las tareas dentro de esa corrutina.
      Esto va en contra de lo que queremos lograr con las corrutinas. Entonces, ¿para qué sirve?
      Es muy útil para implementar tests sobre suspending tasks. En tus tests, envuelve la suspending task que desea probar con una llamada runBlocking y podrás asertar sobre el resultado y evitar que el test finalice antes de que finalice la tarea en segundo plano.
      O para probar como hemos hecho anteriormente en el proyecto Kotlin.
      No se usará habitualmente.
    • launch: Este es el builder más usado.
      Lo utilizarás mucho porque es la forma más sencilla de crear corrutinas.
      A diferencia de runBlocking, no bloqueará el subproceso actual (si usamos los dispatchers adecuados, claro).
      Este builder siempre necesita un scope.
      launch devuelve un Job, que es otra clase que implementa CoroutineContext.
      Los jobs tienen un par de funciones interesantes que pueden ser muy útiles, las vemos más abajo.
      Pero es importante entender que un Job puede tener a su vez otro Job padre.
      Ese job padre tiene cierto control sobre los hijos, y ahí es donde entran en juego estas funciones (Join y Cancel).
    • async: async permite ejecutar varias tareas en segundo plano en paralelo.
      No es una función de suspensión en sí misma, por lo que cuando ejecutamos async, el proceso en segundo plano se inicia, pero la siguiente línea se ejecuta de inmediato.
      async siempre debe llamarse dentro de otra corrutina y devuelve un job especializado que se llama Deferred.
      Este objeto Deferred tiene una nueva función llamada await() que es la que bloquea.
      Llamaremos a await() solo cuando necesitemos el resultado.
      Si el resultado aún no esta listo, la corrutina se suspende en ese punto.
      Si ya tenemos el resultado, simplemente lo devolverá y continuará.
      De esta manera, puedes ejecutar tantas tareas en segundo plano como necesites.

Jobs y funciones

  • El elemento Job es un controlador de corrutinas.
    Cada corrutina que creas con launch o async muestra una instancia de Job que identifica de forma única la corrutina y administra su ciclo de vida.
  • job.join(): Con está función, puedes bloquear la corrutina asociada con el job hasta que todos los jobs hijos hayan finalizado.
    Todas las funciones de suspensión que se llaman dentro de una corrutina están vinculadas a job, así que el job puede detectar cuándo finalizan todos los jobs hijos y después continuar la ejecución.
    job.join() es una función de suspensión en sí misma, por lo que debe llamarse dentro de otra corrutina.
val job = GlobalScope.launch(Dispatchers.Main) {
    doCoroutineTask()
    val res1 = suspendingTask1()
    val res2 = suspendingTask2()
    process(res1, res2)
}
job.join()
  • job.cancel(): Esta función cancelará todos sus jobs hijos asociados.
    Así que, si por ejemplo mientras se está ejecutando suspendingTask1() se llama a cancel(), este no devolverá el valor a res1 y suspendingTask2() no se ejecutará nunca. Recordar que como no se ha usado async las suspendingTasks del ejemplo van en secuencial.
    job.cancel() esta es una función normal, por lo que no requiere una corrutina para ser llamada.

Algunas referencias

Flujos de Kotlin

Un flujo es un tipo de corrutina que puede emitir varios valores de manera secuencial, en lugar de funciones de suspensión que pueden retornar un único valor.

Un flujo conceptualmente es una transmisión de datos que se puede efectuar de forma asíncrona.

Los valores emitidos deben ser del mismo tipo.
Por ejemplo, un Flow de enteros es un flujo que emite valores enteros, pero pueden ser de cualquier otro tipo: String, Float, un data class Persona etc.

Un flujo puede enviar de forma segura una solicitud de red para producir el siguiente valor (del flujo) sin bloquear el subproceso principal de la aplicación y evitar su bloqueo.

Los Flows no son más que unas colecciones lazy, es decir, hace falta una operación terminal que pida el valor del flow, obtener un resultado, la secuencia como tal no se va a procesar.

Eso nos aporta varias ventajas:

  • La primera de ellas es que no se van a generar datos que no necesitamos
  • Y otra ventaja es que podemos crear una secuencia infinita de tal forma que se pueden hacer cosas que con colecciones normales como una lista por ejemplo no sería posible.

La mayor diferencia entre un flow y una secuencia es que el flow es asíncrono, es decir, no necesita generar los todos los resultados en el mismo momento en el que se pide. En una secuencia cuando nosotros hacemos una operación terminal todos los valores de esa secuencia se generan en ese momento, recuperamos el resultado y hacemos con él lo que queramos.

En los flows son asíncronos, es decir, las peticiones al flow pueden no ser generadas inmediatamente y llevar su tiempo obtenerlas. De hecho un flow se puede quedar indefinidamente esperando resultados.

Los Flows corren en el contexto de una Corrutina y por tanto podemos lanzar funciones de suspensión dentro de los flow que nos generen los resultados que nosotros necesitemos.

Los Flows por defecto son Lazy o Cold Streams, es decir, hasta que alguien no se conecta para recibir resultados, estos resultados no se generan. Un Flow no hace nada hasta que alguien empieza a escuchar los valores del mismo.

Recuperar datos de un Flow se denomina Recolectar o Collect.

Si hay otro recolector que se conecta con el mismo Flow en otro momento, va a volver a recibir toda la secuencia desde el principio, no en el punto en que el primer recolector estaba recolectando. Esto es el funcionamiento estandar y se puede configurar de otra forma.

Continuamos con el proyecto Corrutinas para añadir código para los Flows:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow

data class Persona(val nombre: String, val edad: Int)


fun retornarPersona(): Flow<Persona> = flow {
    val lista = listOf(
        Persona("diego", 53),
        Persona("juan", 33),
        Persona("ana", 33)
    )
    for (elemento in lista) {
        delay(1000)
        emit(elemento)
    }
}

fun main() {
    runBlocking {
        retornarPersona().collect(){
            log("${it.nombre} ${it.edad}")
        }
        log("Después del collect")
    }
}

Flow es una interface y mediante la llamada a la función flow pasamos una función lambda, donde mediante la llamada de la función emit retornamos el dato que será procesado desde la llamada a la función collect.

Como collect es una función de suspensión, la misma debe llamarse desde un bloque de una corrutina, en nuestro caso dentro de runBlocking.

Podemos imaginar la recuperación de datos de personas que se encuentran en un servidor que requiere un tiempo no trivial.

Podemos hacer la llamada a la función ‘retornarPersona‘ en forma asíncrona para seguir con la ejecución del hilo principal del programa y no tener que esperar la recuperación de todos los datos:

fun main() {
    runBlocking {
        val diferido =  async {
            retornarPersona().collect(){ //it:Persona
                log("${it.nombre} ${it.edad}")
            }
        }
        log("Después del async")
    }
   println("Fin de main")
}

.asFlow()

Permite generar un Flow a partir de una lista.
En el siguiente ejemplo vemos como de una lista de enteros se ha generado un flow y cómo se recolecta.

//asFlow()
fun main() {
  runBlocking {
    makeFlow().collect(){
        println(it)
    }
    //Otra forma de llamar de forma que para cada elemento se ejecuta una función
    //es utilizar una referencia a la función en la llamada.
    makeFlow().collect(::println)
  }
  println("Fin de main")
}


fun makeFlow(): Flow<Int> {
    return listOf(1,2,3,4,5).asFlow()
}

Generando un Flow

Podemos crear un flow<T> genérico a partir de iteradores o bucles como en el siguiente ejemplo:

Con la función emit(item) emitimos al flow cada elemento.

//Generando un Flow a partir de bucles
fun main() {
  runBlocking {
    makeFlow2().collect(){
        println(it)
    }
    //Otra forma de llamar de forma que para cada elemento se ejecuta una función
    //es utilizar una referencia a la función en la llamada.
    makeFlow().collect(::println)
  }
  println("Fin de main")
}

fun makeFlow2(): Flow<Int> {
    return flow<Int>{
        for (i in 1..10){
            emit(i)
        }
    }
}

Tal como están estos dos ejemplos anteriores no aportan nada respecto a iterar por una colección.

Pero con los flows, al estar en el contexto de una corrutina podemos utilizar una función de suspensión que nos de los datos desde por ejemplo un servidor o una base de datos. El código se vería así:

//Generando un Flow
fun main() {
    runBlocking {
        makeFlow3().collect(){
            log("Recibido: " + it.toString())
        }
    }
  println("Fin de main")
}
fun makeFlow3(): Flow<Int> {
    return flow<Int>{
        for (i in 1..20){
            log("Pedimos datos al servidor")
            val data = GetAsyncData()
            log("  Lo tenemos, vamos a emitir: $data")
            emit(data)
        }
    }
}
suspend fun GetAsyncData(): Int {
    return withContext(Dispatchers.IO){
        //simulamos la ejecución en el servidor
        delay(2000)
        Random.nextInt(1, 100)
    }
}

Operaciones intermedias

Podemos obtener dados del flow, pero tratarlos como hacíamos con las colecciones, aplicar filtros, procesamientos, etc.. e ir encadenando operaciones hasta finalmente tener lo esperado.

Por ejemplo aplicamos un .filter para quedarnos con los pares. (usamos GetAsyncData con un delay de 500 para hacerlo más rápido y aumentamos el numero de veces que lo llamamos a 30)
También usamos un map para transformar(usar) el dato y sacar un string.

//Operacioines intermedias
fun main() {
    runBlocking {
        makeFlow3()
            .filter { it % 2 == 0  } //Nos quedamos solo con los pares
            .map{
                "Obtenido el par: $it"
            }
            .collect(){
                println(it)
                }
    }


  println("Fin de main")
}


fun makeFlow3(): Flow<Int> {
    return flow<Int>{
        for (i in 1..10){
            val data = GetAsyncData()
            emit(data)
        }
    }
}

suspend fun GetAsyncData(): Int {
    return withContext(Dispatchers.IO){
        //simulamos la ejecución en el servidor
        delay(1000)
        Random.nextInt(1, 100)
    }
}

Con las colecciones, cada operación intermedia generaba otra colección con el resultado de la operación.

Con los flows esto no ocurre, finalmente tenemos un único Flow que es el resultante de haber aplicado todas las operaciones intermedias y es realmente a ese flow final al que nos enganchamos para recolectar con collect().
Esto hace que el procesamiento sea más rápido y liviano que con las colecciones. Realmente es lo mismo que con las secuencias, pero los flows son asíncronos.

De todas formas el código anterior no funciona bien, me muestra datos impares, a pesar de que el filtro es correcto. ¿Por qué?

El problema esta en la función makeFlow3(). Esta función genera un flujo de números aleatorios entre 1 y 100. Si los números generados no son pares, aún así se emitirán en el flujo. El filtro .filter { it % 2 == 0 } se aplica después de que los números se generan y se emiten. Por lo tanto, si la función makeFlow3() está generando números impares, estos serán emitidos antes de que el filtro tenga la oportunidad de eliminarlos. Para solucionar este problema, puedes mover la lógica de filtrado a la función makeFlow3(), de modo que solo se emitan números pares.

Tipos especializados de Flows

Se han creado ciertos tipos avanzados de flows que permiten operaciones, en contextos determinados de la programación, más fáciles de realizar que utilizando el flow básico.

Estos son :

  • StateFlow
  • SharedFlow
  • Channel
  • CallbackFlow

SateFlow

A diferencia de un flow normal, que denominábamos Cold Flow, un StateFlow es un flujo que está continuamente generando valores, aunque nadie los lea, por eso se les denomina Hot Flow.

Cuando pedimos un valor al StateFlow, no nos da todos los valores, sino únicamente el último valor, el estado actual del flujo.

Nos servirán para resolver la suscripción a un estado de nuestra aplicación. Nos interesa conocer sólo el estado actual, no los valores anteriores por los que ha pasado el estado.

En el momento de la suscripción al StateFlow se nos entrega el último valor del estado. Veremos más adelante que es un caso especial de SharedFlow.

En el proyecto Corrutinas creamos una clase que nos simula el estado, la llamamos ViewState simulando el estado de una vista.

Creamos una variable privada _state que la asignamos a un StateFlow o a un MutableStateFlow. La diferencia es que los valores del MutableStateFlow podemos cambiarlos. Los StateFlows (mutables o no) deben estar siempre inicializados. En este caso lo inicializamos a 1.

Creamos la variable privada para que nuestra clase (las funciones de la misma) la puedan modificar, pero ofertamos una variable pública inmutable para que los recolectores no puedan cambiarla. Convertimos la mutable en inmutable con .asStateFlow() o bien haciendo un getter con val (inmutable). Esta ultima opción es la del ejemplo.

Además vemos que como un StateFlow únicamente tiene un valor (value) no necesitamos hacer emit(), simplemente modificando la propiedad .value actualizamos el único valor, el último.

class ViewState(){
    private val _state = MutableStateFlow(1)
    val state
        get() = _state

    suspend fun startUpdating(){
        while(true){
            delay(2000)
            _state.value = _state.value +1
        }
    }
}

Ahora en nuestro main, en el runBlocking vamos a ponernos a escuchar ese estado.

Lo que hacemos es crear una variable de nuestra clase ViewState.
Le decimos a nuestra instancia que comience a emitir valores llamando a su método startUpdating(). Pero esto tenemos que hacerlo en una corrutina, ya que la función startUpdating() es un bucle sin fin y por tanto si no lo ponemos en una corrutina, nunca podríamos avanzar a la siguiente línea (la de collect())
En esta siguiente línea, accedemos al estádo público (que es un StateFlow) y nos ponemos a recolectar con collect(::print) (imprimimos el it)

Podemos poner un delay para que no empiece a recolectar pasados unos segundos, con lo que veremos que el estado ha ido evolucionando (cada 2 segundos) a nuevos valores aunque no los hayamos recolectado.

Nuestro bloque principal quedaría asi:

//StateFlow
    runBlocking {
        val viewState = ViewState()
        launch{
            viewState.startUpdating()
        }
        delay(5000) //Comenzamos a recolectar pasados 5 segundos
        viewState.state.collect(::println)
    }

Shared Flows

Domina los Flows en Kotlin (Shared Flows)

Channels

Domina los Flows en Kotlin (Channels)

Callback Flows

Domina los Flows en Kotlin (Callback Flows)

Flows en Kotlin aplicados a Android

Video   Kotlin Flows in practice

Arquitectura basada en Flows de datos desde diversas fuentes, gestionadas por el Repositorio

Recursos y referencias: