Entenent l'asincronia amb Python (asyncio)

Roger Boixader Güell

7/1/2023

Aquest article és una adaptació al català de la xerrada que va fer Juan Pedro Fisanotti (https://www.youtube.com/watch?v=u_NDCBdHhzc).

Avui dia, per poder treballar amb asincronia amb Python tenim una llibreria anomenada asyncio que ens permet poder fer codi asíncron amb Python. A vegades pot semblar una mica feixuc entendre realment com funciona per dins aquesta llibreria i com executa les funcions de forma asíncrona, així que primer intentarem explicar com podem fer codi asíncron amb Python per després entendre molt més fàcilment la llibreria i totes les funcionalitats que ens dona.

Per començar anem a veure el que són els generadors amb Python.

Generadors

Els generadors ens permeten iterar sobre elements, però sense tenir tots els elements carregats en memòria. Per fer això tenim la paraula clau yield que el que fa és aturar l'execució de la funció en un punt i retornar un valor, és a dir, va generant a mesura que ho anem demanant.

def funcio_generadora():
    yield 'primer valor'
    yield 'segon valor'
    yield 'tercer valor'


if __name__ == "__main__":
    generador = funcio_generadora()
    print(next(generador))
    print(next(generador))

Com podem veure, la forma més primitiva de consumir els generadors és cridant a la funció next. En aquest exemple obtindríem el primer i el segon valor.

Concurrència

Anem a parlar de què és la concurrència.

La concurrència bàsicament és poder fer dues accions a la vegada, però realment no s'estan fent a la vegada, per exemple, estic escrivint un article mentre cuino el dinar. Realment jo estic fent les dues coses, però si paro el temps i miro què estic fent estaré o escrivint o cuinant. Primer escric una estona, després cuino, i així fins a acabar les dues tasques, anant fent una mica de cadascuna.

Si portem aquest concepte al món de la programació, tenint en compte que només tenim 1 sol nucli de CPU, ja que si tenim més d'un nucli i més d'un procés llavors estaríem parlant de paral·lelisme, podem tenir un procés que té N fils.

Quan executem aquest procés, la CPU va executant una mica de cada fil, però sense control d'on s'atura per fer el canvi de context entre els fils. Com que els fils d'un mateix procés comparteixen memòria, ens podem trobar el cas que hi hagi incoherències amb les dades si un fil modifica un valor, que l'altre està utilitzant.

Gràcies al fet que els generadors són funcions que nosaltres podem decidir quan s'aturen, tenim el poder per controlar aquests problemes i per exemple no deixarem que cap altra funció s'executi mentre estigui fent un càlcul concret.

Gestor de tasques

Un cop hem vist com funcionen els generadors i amb què ens poden ajudar per executar diferents funcions concurrentment anem a veure com podem fer aquesta gestió. Necessitem implementar un bucle que vagi consumint totes les tasques que li anem llançant (nosaltres construirem una versió molt simplificada del que és el bucle de la llibreria asyncio).

def primera_tasca():
    print('realitzem primera part primera tasca')
    yield
    print('realitzem segona part primera tasca')


def segona_tasca():
    print('realitzem primera part segona tasca')
    yield
    print('realitzem segona part segona tasca')
    yield
    print('realitzem tercera part segona tasca')


def loop(tasques):
    while tasques:
        tasca = tasques.pop(0)
        try:
            next(tasca)
            tasques.append(tasca)
        except StopIteration:
            pass


if __name__ == "__main__":
    loop([primera_tasca(), segona_tasca()])

Aquest bucle anirà consumint les tasques. Agafarà la primera, executarà fins que trobi un yield. En aquest punt poden passar dues coses. O que ja no hi hagi més codi per executar o que trobi un yield i aturi l'execució de la funció. Si no hi ha més codi per executar, retornarà l'excepció StopIteration, en cas que encara hi hagi més codi per executar la posarem a la cua per no tornar a executar la mateixa. Acabarem l'execució del bucle quan hàgim executat totes les tasques.

Si executem el codi anterior, podrem veure com va canviant de context i primer executa una mica de la primera tasca i després de la segona, fins a acabar les dues.

Tasques bloquejants

Ara ja tenim el nostre bucle que anirà consumint tasques, però que passa si en alguna d'aquestes tasques tenim alguna operació bloquejant? Podríem tenir una operació de lectura de disc, pujar una foto, etc. En aquest cas el que estaria passant és que bloquejaríem l'execució de les altres funcions fins que l'operació bloquejant no acabes. Per simular-ho podríem posar un sleep de 10 segons en alguna de les dues tasques, i podríem veure que no s'executa res fins passat aquests 10 segons.

import time

def primera_tasca():
    print('realitzem primera part primera tasca')
    time.sleep(10)
    yield
    print('realitzem segona part primera tasca')


def segona_tasca():
    print('realitzem primera part segona tasca')
    yield
    print('realitzem segona part segona tasca')
    yield
    print('realitzem tercera part segona tasca')


def loop(tasques):
    while tasques:
        tasca = tasques.pop(0)
        try:
            next(tasca)
            tasques.append(tasca)
        except StopIteration:
            pass


if __name__ == "__main__":
    loop([primera_tasca(), segona_tasca()])

Per solucionar aquest problema necessitem que totes les tasques bloquejants siguin també asíncrones, i a més necessitem una forma de dir-li al bucle que no volem seguir l'execució de la funció fins que la tasca bloquejant hagi acabat.

Yield from

Aquí és on entra el yield from. El que ens permet és executar una generador dins d'una altre generador. Anem a veure-ho amb codi. Si tornem a l'exemple inicial dels generadors, podem fer:

def funcio_generadora():
    yield 'primer valor'
    yield 'segon valor'
    yield from segona_funcio_generadora()
    yield 'tercer valor'

def segona_funcio_generadora():
    yield 'primer valor de la segona funcio generadora'
    yield 'segon valor de la segona funcio generadora'
    yield 'tercer valor de la segona funcio generadora'


if __name__ == "__main__":
    generador = funcio_generadora()
    for item in generador:
      print(item)

Aquí veiem una nova forma de consumir els generadors, que és amb un bucle for. Si executem aquest codi, veurem que primer mostra els dos primers valors de la funcio_generadora després mostra els 3 de la segona_funcio_generadora i finalment l'últim valor de la funcio_generadora. El que ens interessa aquí és que la funció main realment només ha consumit de la funcio_generadora i no sap que internament està consumint d'un altre generador. També la nostra funcio_generadora s'ha esperat que s'acabi la segona_funcio_generadora per continuar retornant els seus valors, i això és el que ens fa falta per quan les nostres tasques tenen altres tasques bloquejants i necessitem esperar-nos.

Tornant al nostre bucle podem fer alguna cosa de l'estil:

def primera_tasca():
    print('realitzem primera part primera tasca')
    yield from tasca_bloquejant()
    print('realitzem segona part primera tasca')


def segona_tasca():
    print('realitzem primera part segona tasca')
    yield
    print('realitzem segona part segona tasca')
    yield
    print('realitzem tercera part segona tasca')


def tasca_bloquejant():
    print('primer part tasca bloquejant')
    yield
    print('segona part tasca bloquejant')
    yield
    print('tercera part tasca bloquejant')


def loop(tasques):
    while tasques:
        tasca = tasques.pop(0)
        try:
            next(tasca)
            tasques.append(tasca)
        except StopIteration:
            pass


if __name__ == "__main__":
    loop([primera_tasca(), segona_tasca()])

Si executem aquest codi, podrem veure que la primera_tasca s'esperarà que acabi la tasca_bloquejant per seguir executant part del seu codi. Però realment el nostre bucle no sap que internament hi ha una tercera tasca, per ell continua tenint només dues tasques.

Ara ja tenim les nostres tasques bloquejants i el nostre loop no executarà més parts de les nostres tasques si a dins n'hi ha de bloquejants.

Tasques síncrones

Un dels problemes que ens apareixen en crear programes que són asíncrons, és que totes les funcions que utilitzem han de ser-ho, han de saber aturar-se per deixar el control i tornar a executar-se en un futur. Què passa si hem de fer servir una funció que és síncrona? Ja que la llibreria encara no té una versió asíncrona per exemple?

asyncio per resoldre-ho fa servir fils on en aquell fil executa la funció síncrona fora del loop principal i d'aquesta manera no el bloqueja i quan ha acabat retorna el resultat per poder-lo consumir.

Sintaxis async/await

Al Pyton 3.5 es van introduir les paraules claus async i await. D'aquesta manera ara podem substituir el yield from per await i marcar la funció com a async.

Asyncio

Ara hem representat el que seria un loop molt primitiu com el que ens dona asyncio, tot i que la llibreria introdueix moltes altres funcionalitats, com podria ser anar llançant tasques mentre ja hi ha tasques executant, marcar quan volem que s'executin, etc.

El nostre codi fet amb asyncio seria:

from asyncio import gather, run, sleep

async def primera_tasca():
    print('realitzem primera part primera tasca')
    await tasca_bloquejant()
    print('realitzem segona part primera tasca')


async def segona_tasca():
    print('realitzem primera part segona tasca')
    await sleep(0)
    print('realitzem segona part segona tasca')
    await sleep(0)
    print('realitzem tercera part segona tasca')


async def tasca_bloquejant():
    print('primer part tasca bloquejant')
    await sleep(0)
    print('segona part tasca bloquejant')
    await sleep(0)
    print('tercera part tasca bloquejant')


# def loop(tasques):
#     while tasques:
#         tasca = tasques.pop(0)
#         try:
#             next(tasca)
#             tasques.append(tasca)
#         except StopIteration:
#             pass


# if __name__ == "__main__":
#     loop([primera_tasca(), segona_tasca()])

async def main():
    print("Inici programa")
    await gather(primera_tasca(), segona_tasca())
    print("Fi programa")

run(main())