Tabla de Contenidos
Secuencias en Kotlin.
Un pequeño previo primero antes de ver las corrutinas y los Flows.
Como veremos los Flows son Secuencias Asíncronas. Por tanto es interesante entender qué son las secuencias, ya que además pueden optimizar el tiempo de ejecución respecto a usar colecciones: Secuencias en Kotlin
- Las secuencias son colecciones.
- El calculo de los elementos de la coleccion se hace cuando se necesitan o se accede a la secuencia. Ventajas:
- Las secuencias pueden ser infitas, se pueden basar en una semilla y una operación sobre ella, por lo que los valores pueden ser infinitos.
- Nos permiten evitar pasos intermedios en las operaciones con colecciones, al trabajar con una secuencia no se generan las colecciones itermedias de las operaciones aplicadas.
Crear secuencias
Podemos crear secuencias con sequenceOf() o bien usar .asSequence() si partimos ya de una colección.
// //CREAR SECUENCIAS //Crear una secuencia con SequenceOf val sequeceOfInt = sequenceOf(1, 2, 3, 4, 5); //Crear una secuencia a partir de otra colección val strNumbers = listOf("uno", "dos", "tres", "cuatro", "cinco") val sequence = strNumbers.asSequence() //Filtramos,sin utilizar asSequence, los elementos de strNumbers que tienen más de 3 letras val strNumbersSize= strNumbers .filter { it.length > 3 } //esto crea una nueva lista .map{it.length} //esto crea otra nueva lista //Filtramos, utilizando asSequence, los elementos de strNumbers que tienen más de 3 letras val strNumbersSize2= strNumbers .asSequence() //esto crea una secuencia .filter { it.length > 3 } //esto no crea una nueva lista, sino que opera sobre la secuencia .map{it.length} //esto no crea una nueva lista, sino que opera sobre la secuencia .toList() //esto convierte la secuencia final en una lista. //Con esta forma hemos evitado crear dos listas intermedias
También podemos crear secuencias con la funcion geneerateSequence() que le pasamos una semilla y una función.
//// //CREAR SECUENCIAS CON generateSequence val oddNumbers = generateSequence(1) { it + 2 } //Empieza en 1 y va sumando 2 //esto me genera una secuencia infinita de números impares //pero la usuaré para extraer de ella los que me interesen. //extraerermos los que no sean modulo de 3 y que al convertirlo a string su longitud sea menor o igual que 3 //pero solo los numeros del 1 al 99 primeros val oddNumbers2 = generateSequence(1) { it + 2 } .filter { it % 3 != 0 } .map { it.toString() } .filter { it.length <= 3 } .take(99) .toList()
Creación de secuencias utilizando sequence y Yield(). Esta forma sirve mucho para entender los flows.Yield es una función de suspensión que proporciona un valor o una serie de valores y suspende hasta que le pidan el siguiente valor.
//// //CREAR SECUENCIAS CON sequence y yield val seq = sequence { yield(3) } println(seq.toList()) //Ahora vamos a hacer una secuencia que genere los números del 1 al 5 val seq2 = sequence { for (i in 1..5) { yield(i) } } println(seq2.toList()) //Aqui generamos una secuencia que genere los números del 1 al 5 y luego los del 6 al 10 //utilizando yieldAll que lo que hace es añadir una lista de elementos a la secuencia val seq3 = sequence { for (i in 1..5) { yield(i) } yieldAll(listOf(6, 7, 8, 9, 10)) } println(seq3.toList()) //Aqui generamos una secuencia que genere los números del 1 al 5 y luego los del 6 al 10 //utilizando yieldAll que lo que hace es añadir una lista de elementos a la secuencia val seq4 = sequence { var last:Int=0 for (i in 1..5) { yield(i) last=i; } //Añadimos con yieldAll la secuencia de numeros impares a partir del último valor de la anterior yieldAll(generateSequence(last+2) { it + 2 }) } println(seq4.take(10).toList())
Tipos de operaciones con secuencias
Las operaciones con secuencias se pueden clasificar en función de si necesitan estado o no y también se pueden clasificar en función de si son operaciones intermedias o de finalización.
En cuando al estado: Operaciones Stateless y operaciones Statefull
- Stateless: no necesitamos estado (o un estado muy pequeño) para generar la secuencia. Ejemplos: map, take, filter, drop
- Statefull: necesitamos un estado grande para poder generar la secuencia. Esto lleva un coste como si se tratara de generar una colección, puesto que para calcular una secuencia necesita recorrer todos los elementos de la secuencia anterior para determinar la nueva. Ejemplos: Sorted, Distinct, Chunk
- Intermediate: Genera otra secuencia
- Terminal: Lanza la generación de todos los valores de la secuencia para convertirla en una colección o un valor final. Ejemplos: toList(), sum(),
Veremos que los Flows no son más que Secuencias Asíncronas.
Flujos de Kotlin
Un flujo es un tipo de corrutina que puede emitir varios valores de manera secuencial, en lugar de funciones de suspensión que pueden retornar un único valor.
Un flujo conceptualmente es una transmisión de datos que se puede efectuar de forma asíncrona.
Los valores emitidos deben ser del mismo tipo.
Por ejemplo, un Flow de enteros es un flujo que emite valores enteros, pero pueden ser de cualquier otro tipo: String, Float, un data class Persona etc.
Un flujo puede enviar de forma segura una solicitud de red para producir el siguiente valor (del flujo) sin bloquear el subproceso principal de la aplicación y evitar su bloqueo.
Los Flows no son más que unas colecciones lazy, es decir, hace falta una operación terminal que pida el valor del flow, obtener un resultado, la secuencia como tal no se va a procesar.
Eso nos aporta varias ventajas:
- La primera de ellas es que no se van a generar datos que no necesitamos
- Y otra ventaja es que podemos crear una secuencia infinita de tal forma que se pueden hacer cosas que con colecciones normales como una lista por ejemplo no sería posible.
La mayor diferencia entre un flow y una secuencia es que el flow es asíncrono, es decir, no necesita generar los todos los resultados en el mismo momento en el que se pide. En una secuencia cuando nosotros hacemos una operación terminal todos los valores de esa secuencia se generan en ese momento, recuperamos el resultado y hacemos con él lo que queramos.
En los flows son asíncronos, es decir, las peticiones al flow pueden no ser generadas inmediatamente y llevar su tiempo obtenerlas. De hecho un flow se puede quedar indefinidamente esperando resultados.
Los Flows corren en el contexto de una Corrutina y por tanto podemos lanzar funciones de suspensión dentro de los flow que nos generen los resultados que nosotros necesitemos.
Los Flows por defecto son Lazy o Cold Streams, es decir, hasta que alguien no se conecta para recibir resultados, estos resultados no se generan. Un Flow no hace nada hasta que alguien empieza a escuchar los valores del mismo.
Recuperar datos de un Flow se denomina Recolectar o Collect.
Si hay otro recolector que se conecta con el mismo Flow en otro momento, va a volver a recibir toda la secuencia desde el principio, no en el punto en que el primer recolector estaba recolectando. Esto es el funcionamiento estandar y se puede configurar de otra forma.
Continuamos con el proyecto Corrutinas para añadir código para los Flows:
import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow data class Persona(val nombre: String, val edad: Int) fun retornarPersona(): Flow<Persona> = flow { val lista = listOf( Persona("diego", 53), Persona("juan", 33), Persona("ana", 33) ) for (elemento in lista) { delay(1000) emit(elemento) } } fun main() { runBlocking { retornarPersona().collect(){ log("${it.nombre} ${it.edad}") } log("Después del collect") } }
Flow es una interface y mediante la llamada a la función flow pasamos una función lambda, donde mediante la llamada de la función emit retornamos el dato que será procesado desde la llamada a la función collect.
Como collect es una función de suspensión, la misma debe llamarse desde un bloque de una corrutina, en nuestro caso dentro de runBlocking.
Podemos imaginar la recuperación de datos de personas que se encuentran en un servidor que requiere un tiempo no trivial.
Podemos hacer la llamada a la función ‘retornarPersona‘ en forma asíncrona para seguir con la ejecución del hilo principal del programa y no tener que esperar la recuperación de todos los datos:
fun main() { runBlocking { val diferido = async { retornarPersona().collect(){ //it:Persona log("${it.nombre} ${it.edad}") } } log("Después del async") } println("Fin de main") }
.asFlow()
Permite generar un Flow a partir de una lista.
En el siguiente ejemplo vemos como de una lista de enteros se ha generado un flow y cómo se recolecta.
//asFlow() fun main() { runBlocking { makeFlow().collect(){ println(it) } //Otra forma de llamar de forma que para cada elemento se ejecuta una función //es utilizar una referencia a la función en la llamada. makeFlow().collect(::println) } println("Fin de main") } fun makeFlow(): Flow<Int> { return listOf(1,2,3,4,5).asFlow() }
Generando un Flow
Podemos crear un flow<T> genérico a partir de iteradores o bucles como en el siguiente ejemplo:
Con la función emit(item) emitimos al flow cada elemento.
//Generando un Flow a partir de bucles fun main() { runBlocking { makeFlow2().collect(){ println(it) } //Otra forma de llamar de forma que para cada elemento se ejecuta una función //es utilizar una referencia a la función en la llamada. makeFlow().collect(::println) } println("Fin de main") } fun makeFlow2(): Flow<Int> { return flow<Int>{ for (i in 1..10){ emit(i) } } }
Tal como están estos dos ejemplos anteriores no aportan nada respecto a iterar por una colección.
Pero con los flows, al estar en el contexto de una corrutina podemos utilizar una función de suspensión que nos de los datos desde por ejemplo un servidor o una base de datos. El código se vería así:
//Generando un Flow fun main() { runBlocking { makeFlow3().collect(){ log("Recibido: " + it.toString()) } } println("Fin de main") } fun makeFlow3(): Flow<Int> { return flow<Int>{ for (i in 1..20){ log("Pedimos datos al servidor") val data = GetAsyncData() log(" Lo tenemos, vamos a emitir: $data") emit(data) } } } suspend fun GetAsyncData(): Int { return withContext(Dispatchers.IO){ //simulamos la ejecución en el servidor delay(2000) Random.nextInt(1, 100) } }
Operaciones intermedias
Podemos obtener dados del flow, pero tratarlos como hacíamos con las colecciones, aplicar filtros, procesamientos, etc.. e ir encadenando operaciones hasta finalmente tener lo esperado.
Por ejemplo aplicamos un .filter para quedarnos con los pares. (usamos GetAsyncData con un delay de 500 para hacerlo más rápido y aumentamos el numero de veces que lo llamamos a 30)
También usamos un map para transformar(usar) el dato y sacar un string.
//Operacioines intermedias fun main() { runBlocking { makeFlow3() .filter { it % 2 == 0 } //Nos quedamos solo con los pares .map{ "Obtenido el par: $it" } .collect(){ println(it) } } println("Fin de main") } fun makeFlow3(): Flow<Int> { return flow<Int>{ for (i in 1..10){ val data = GetAsyncData() emit(data) } } } suspend fun GetAsyncData(): Int { return withContext(Dispatchers.IO){ //simulamos la ejecución en el servidor delay(1000) Random.nextInt(1, 100) } }
Con las colecciones, cada operación intermedia generaba otra colección con el resultado de la operación.
Con los flows esto no ocurre, finalmente tenemos un único Flow que es el resultante de haber aplicado todas las operaciones intermedias y es realmente a ese flow final al que nos enganchamos para recolectar con collect().
Esto hace que el procesamiento sea más rápido y liviano que con las colecciones. Realmente es lo mismo que con las secuencias, pero los flows son asíncronos.
De todas formas el código anterior no funciona bien, me muestra datos impares, a pesar de que el filtro es correcto. ¿Por qué?
El problema esta en la función makeFlow3(). Esta función genera un flujo de números aleatorios entre 1 y 100. Si los números generados no son pares, aún así se emitirán en el flujo. El filtro .filter { it % 2 == 0 } se aplica después de que los números se generan y se emiten. Por lo tanto, si la función makeFlow3() está generando números impares, estos serán emitidos antes de que el filtro tenga la oportunidad de eliminarlos. Para solucionar este problema, puedes mover la lógica de filtrado a la función makeFlow3(), de modo que solo se emitan números pares.
Tipos especializados de Flows
Se han creado ciertos tipos avanzados de flows que permiten operaciones, en contextos determinados de la programación, más fáciles de realizar que utilizando el flow básico.
Estos son :
- StateFlow
- SharedFlow
- Channel
- CallbackFlow
SateFlow
A diferencia de un flow normal, que denominábamos Cold Flow, un StateFlow es un flujo que está continuamente generando valores, aunque nadie los lea, por eso se les denomina Hot Flow.
Cuando pedimos un valor al StateFlow, no nos da todos los valores, sino únicamente el último valor, el estado actual del flujo.
Nos servirán para resolver la suscripción a un estado de nuestra aplicación. Nos interesa conocer sólo el estado actual, no los valores anteriores por los que ha pasado el estado.
En el momento de la suscripción al StateFlow se nos entrega el último valor del estado. Veremos más adelante que es un caso especial de SharedFlow.
En el proyecto Corrutinas creamos una clase que nos simula el estado, la llamamos ViewState simulando el estado de una vista.
Creamos una variable privada _state que la asignamos a un StateFlow o a un MutableStateFlow. La diferencia es que los valores del MutableStateFlow podemos cambiarlos. Los StateFlows (mutables o no) deben estar siempre inicializados. En este caso lo inicializamos a 1.
Creamos la variable privada para que nuestra clase (las funciones de la misma) la puedan modificar, pero ofertamos una variable pública inmutable para que los recolectores no puedan cambiarla. Convertimos la mutable en inmutable con .asStateFlow() o bien haciendo un getter con val (inmutable). Esta ultima opción es la del ejemplo.
Además vemos que como un StateFlow únicamente tiene un valor (value) no necesitamos hacer emit(), simplemente modificando la propiedad .value actualizamos el único valor, el último.
class ViewState(){ private val _state = MutableStateFlow(1) val state get() = _state suspend fun startUpdating(){ while(true){ delay(2000) _state.value = _state.value +1 } } }
Ahora en nuestro main, en el runBlocking vamos a ponernos a escuchar ese estado.
Lo que hacemos es crear una variable de nuestra clase ViewState.
Le decimos a nuestra instancia que comience a emitir valores llamando a su método startUpdating(). Pero esto tenemos que hacerlo en una corrutina, ya que la función startUpdating() es un bucle sin fin y por tanto si no lo ponemos en una corrutina, nunca podríamos avanzar a la siguiente línea (la de collect())
En esta siguiente línea, accedemos al estádo público (que es un StateFlow) y nos ponemos a recolectar con collect(::print) (imprimimos el it)
Podemos poner un delay para que no empiece a recolectar pasados unos segundos, con lo que veremos que el estado ha ido evolucionando (cada 2 segundos) a nuevos valores aunque no los hayamos recolectado.
Nuestro bloque principal quedaría asi:
//StateFlow runBlocking { val viewState = ViewState() launch{ viewState.startUpdating() } delay(5000) //Comenzamos a recolectar pasados 5 segundos viewState.state.collect(::println) }
Shared Flows
Domina los Flows en Kotlin (Shared Flows)
Channels
Domina los Flows en Kotlin (Channels)
Callback Flows
Domina los Flows en Kotlin (Callback Flows)
Flows en Kotlin aplicados a Android
Video Kotlin Flows in practice
