Skip to the content.

Torna all’indice generazione tempi >Versione in C++

SCHEDULAZIONE CON ASYNC/AWAIT

Base teorica

Gestione con I/O sincrono bloccante in figura a sinistra. Gestione con I/O asincrono e non bloccante nella stessa figura ma a destra.

alt text

Eventi vs thread

alt text

Il modello di gestione della CPU nei SO normalmente è di tipo multithreading preemptive, cioè con interruzione anticipata del task in esecuzione con riassegnazione della risorsa CPU ad un altro task, per almeno due motivi:

Il modello di gestione della CPU in ambienti server come node JS e client come l’ambiente javascript di un browser web, invece, è normalmente a singolo thread dove il multitasking è generato non utilizzando il multithreading ma un modello di esecuzione ad eventi (event driven runtime) composto da:

Callback

Un callback è una funzione che:

Le callback sono il modo principale in cui vengono implementate in un modello ad eventi le azioni di risposta ad un evento, spesso mediante funzioni definite una sola volta nel codice, tipicamente in forma anonima.

Le callback possono essere:

Modello ad eventi

I casi d’uso che potrebbero beneficiare di un modello a thread singolo ad eventi potrebbero essere:

Gli svantaggi sono ascrivibili a:

alt text

La libreria async.io ha un modello di runtime basato su un ciclo di eventi (event loop), che è responsabile:

Questo modello è abbastanza diverso dai modelli in altri linguaggi come C e Java basati su processi e thread. Una proprietà molto interessante è che un linguaggio ad eventi, a differenza di molti altri linguaggi, non blocca mai gli altri task quando si è in attesa di un input sul task corrente.

La gestione dell’I/O viene in genere eseguita tramite eventi e callback:

Il primo messaggio in coda viene di volta in volta estratto e processato per essere eseguito inserendo la sua callback, e tutte le funzioni ad essa annidate, in altrettanti frame sullo stack. La callback correntemente sullo stack, viene eseguita fino a che non ritornano tutte le sottofunzioni ad essa annidate.

Se le operazioni da svolgere nei task sono CPU intensive è buona norma delegarle a fornitori di servizi esterni al thread corrente, questi possono essere servizi in rete oppure servizi in esecuzione su altri thread. Una volta completata l’operazione delegata (può trascorrere un certo tempo), viene richiamata una callback (sul thread del loop degli eventi) con cui si notificano i risultati dell’operazione.

Allocazione della RAM

alt text

Gli oggetti sono allocati nella heap che è un’ampia regione di memoria per lo più non strutturata. Gli eventi sono immagazzinati in una coda di messaggi. Ogni messaggio ha una funzione associata (listener) che viene chiamata per gestire il messaggio.

Meccanismo di esecuzione

alt text

Ad un certo punto durante il ciclo di eventi, il processo runtime inizia a gestire i messaggi sulla coda, a partire da quello più vecchio. Per fare ciò:

L’elaborazione delle funzioni continua finché lo stack non è nuovamente vuoto. Quindi, il ciclo di eventi elaborerà il messaggio successivo nella coda (se ce n’è uno).

Loop di gestione degli eventi:

while queue.waitForMessage():
	queue.processNextMessage()

La funzione queue.waitForMessage() attende in modo sincrono l’arrivo di un messaggio (se uno non è già disponibile e in attesa di essere gestito).

Run to completition

Ogni messaggio viene elaborato completamente prima che venga elaborato qualsiasi altro messaggio.

Questo comportamento induce alcune proprietà desiderabili sui programmi, incluso il fatto che, ogni volta che una funzione viene eseguita, non può essere terminata in anticipo e verrà eseguita interamente prima dell’esecuzione di qualsiasi altro codice (che potrebbe alterare i dati manipolati dalla funzione). Ciò differisce da C, ad esempio, dove se una funzione viene eseguita in un thread, può essere interrotta in qualsiasi momento dal sistema di runtime (un timer HW) per eseguire altro codice in un altro thread.

Uno svantaggio di questo modello è che se un messaggio richiede troppo tempo per essere completato, l’applicazione non è in grado di elaborare le interazioni dell’utente come il clic di un pulsante. Una buona pratica da seguire è rendere breve l’elaborazione dei messaggi e, se possibile, scomporre un messaggio in più messaggi. In caso contrario si può utilizzare un meccanismo di delega asincrona delle operazioni pesanti a servizi in rete o a worker su altri thread.

Aggiunta di un messaggio in coda

Future

Una future è un oggetto restituito da una funzione asincrona, che rappresenta lo stato corrente dell’operazione di recupero di una informazione (da I/O o da un timer HW).

Async/await

Il caso d’uso principale per async/await è rendere più semplice la scrittura di codice asincrono o, in generale, qualsiasi codice che utilizzi molte callback o futures/promesse. In particolare, async/await può aiutare a evitare l’inferno delle callback annidate (callback hell noto anche come piramide del destino) scrivendo programmi in stile imperativo (lineare e sequenziale) invece che ad eventi. L’obiettivo è ristrutturare il programma in modo che sia più facile da scrivere e mantenere per gli umani. Anche se i programmi asincroni sono il caso d’uso più comune, async/await non richiede che il codice sia effettivamente asincrono o utilizzi IO non bloccanti. Fondamentalmente, si tratta solo di trasformazione strutturale di un programma.

alt text

import uasyncio
async def c():
    print('2')
    await uasyncio.sleep_ms(2000)
    print('4')
    await uasyncio.sleep_ms(5000)
    print('5')

async def main():
    print('1')
    uasyncio.create_task(c()) # inserisce il task nel loop
    await uasyncio.sleep_ms(0) # permette di cominciare l'altro task prima che termini il corrente
    print('3')
    await uasyncio.sleep_ms(10000) # attende il tempo necessario per completare gli altri task

uasyncio.run(main()) # Crea un nuovo task dalla coroutine specificata e lo esegue fino al completamento.

Link simulazione online: https://wokwi.com/projects/369865863273101313

Le funzioni asincrone sono una tecnica che rende molto più intuitiva la gestione delle promesse svincolandola dall’esigenza di definire per ciascuna due callback.

Ciò avviene in Python tramite le cosidette “coroutine”. Una coroutine è una subroutine (funzione) che può essere sospesa e ripresa. Viene sospesa dall’espressione di await e ripresa una volta risolta la await. La ripresa dal punto di sospensione avviene mantenendo gli argomenti e le variabili locali della funzione sospesa.

La sospensione del flusso di controllo di un task avviene spontaneamente normalmente per due motivi:

Ad ogni sospensione pendente corrisponde l’attesa di un evento che risolve la Future (o la Promise) sbloccandola. Una volta risolta positivamente, viene inserita nella coda di esecuzione il microtask associato a quella Future. Potrebbe esserci pure un microtask alternativo associato ad una future non risolta (alla scadenza di un timeout).

Nonostante il nome, Il blocco di codice async diventa per tutte le funzioni che restituiscono una future, sincrono e bloccante nel senso che, ciascuna funzione con await davanti, rimane bloccata e non può passare all’istruzione successiva fino a che la sua future non viene risolta. Il blocco dell’esecuzione è in realtà solamente apparente perchè è sostanzialmente emulato ma, nonostante tutto, efficace nell’impedire l’esecuzione delle istruzioni seguenti all’interno dello stesso task.

Esistono delle differenze implementative nella realizzazione delle coroutine nei vari linguaggi. Il comportamento comune a tutti è che una coroutine, una volta invocata, non ritornerà subito alla funzione chiamante (sincrona) ma solo successivamente, seguendo un percorso a tappe, man mano che gli eventi che essa, di volta in volta, attende verranno risolti tutti. La differenza risiede essenzialmente nel comportamento della funzione chiamante:

Il modello async/await fornise un meccanismo di blocco dei task (compiti) in cima a un sistema ad eventi, senza l’overhead di uno stack per ogni thread. Lo scopo del modello è quello di implementare un flusso sequenziale di controllo senza utilizzare complesse macchine a stati finiti ed evitando l’overhead di un multi-threading completo, cioè quello dotato anche del modo preemptive. L’asynchronous I/O scheduler fornisce la sola modalità cooperativa e il rilascio anticipato delle risorse è realizzato in maniera cooperativa senza l’utilizzo di interrupt che, generati da un timer HW, determinino il cambio di contesto forzato dei thread.

Confronto con le altre tecniche

Anche i processi e i thread sono flussi di esecuzione indipendenti che procedono in parallelo su una o più CPU, esiste però una differenza pratica notevole tra di essi:

Utilizzo in pratica

Ogni task asincrono realizza un flusso di esecuzione indipendente da quello degli altri task, inoltre ognuno possiede un proprio loop() principale di esecuzione in cui realizzare le operazioni che tipicamente riguardano le tre fasi di lettura degli ingressi, calcolo dello stato del sistema e della sua risposta e la fase finale di scrittura della risposta sulle uscite. Il loop principale è definito sotto forma di ciclo infinito come ad esempio:

while True:
    # codice del task
    #.........................

Le fasi di lavoro del loop possono essere schedulate (pianificate nel tempo) dal delay() asyncio.sleep(t_sec) che restituiscono un oggetto future (future) che, pur essendo non bloccante, fa ritornare in anticipo la funzione, prima che possa eseguire altre istruzioni emulando quindi, in tutto e per tutto, una funzione bloccante. Questa proprietà permette la progettazione sostanzialmente lineare e sequenziale di un algoritmo nel tempo.

async def nome_task(x):
    # setup del tasl
    count = 0
    # loop del tasl
    while True: 
        count += 1
        print('Instance: {} count: {}'.format(x, count))
        await asyncio.sleep(delay_secs)
        print('Hello')
        await asyncio.sleep_ms(delay_ms)

Se si volesse bloccare l’esecuzione di un task finchè un flag, asserito in un altro task, non la sblocca si deve tenere conto che non si può usare un blocco del genere

    global flag
    while not flag:
        pass
    flag = False
    #.........................

ma piuttosto il seguente:

    global flag
    while not flag:
        await asyncio.sleep(0)
    flag = False
    #.........................

Il flusso di esecuzione di un task è definito all’interno di una funzione asincrona e può essere avviato passando allo schedulatore il riferimento a questa funzione sotto la forma di parametro. In sostanza la funzione serve al programmatore per definire il task e allo schedulatore per poterlo richiamare.

E’ importante notare che anche la funzione main deve essere resa asincrona con la parola chiave async se si desidera eseguirla in parallelo con gli altri task seguendo una modalità cooperativa. In sostanza, anche la funzione main deve diventare un task asincrono. Se così non fosse, una funzione di ritardo delay() oppure una qualunque funzione di I/O bloccante monopolizzarebbero l’unico thread a disposizione impedendo la schedulazione degli altri task.

Per quanto riguarda la definizione di un task va ricordato che ll’interno del loop del task ogni ramo di esecuzione va reso non bloccante inserendo, la funzione asyncio.sleep(10) (mai la normale delay()) se il flusso di esecuzione deve essere bloccato temporaneamente per un certo tempo fissato, oppure la funzione asyncio.sleep(0) se questo non deve essere bloccato. Ciò serve a richiamare lo schedulatore almeno una volta, qualunque direzione di esecuzione prenda il codice, in modo da cedere “spontaneamente” il controllo ad un altro task al termine del loop() del task corrente. La cessione del controllo dello schedulatore ad ogni ramo di esecuzione è necessario altrimenti gli altri task non verrebbero mai eseguiti (il sistema non è preemptive).

Sia asyncio.sleep(0) che asyncio.sleep(10) cedono il controllo della CPU allo schedulatore che lo assegna agli altri task che eventualmente in quel momento hanno scaduto il tempo di attesa di un loro delay.

Come già detto, il task principale che contine il main può essere mandato in esecuzione con la funzione asyncio.run(main())) .

Nell’esempio seguente, nel main vengono avviati tre task in parallello tramite la funzione asyncio.create_task(nome_task). Il tempo di vita dei task è legato a quello della loro funzione chiamante, cioè il main. Se questo termina anche i task creati al suo interno terminano.

import uasyncio as asyncio
async def bar(x):
    count = 0
    while True:
        count += 1
        print('Instance: {} count: {}'.format(x, count))
        await asyncio.sleep(1)  # Pause 1s

async def main():
    for x in range(3):
        asyncio.create_task(bar(x))
    await asyncio.sleep(10) # il main e i task correllati cesseranno dopo 10 sec

asyncio.run(main())

Link simulazione online: https://wokwi.com/projects/369675288427672577

Coroutines annidate

Per eseguire effettivamente una coroutine, asyncio fornisce tre meccanismi principali:

Il seguente frammento di codice illustra meglio l’ultimo caso. Verrà stampato “hello” dopo aver atteso 1 secondo, quindi stamperà “world” dopo aver atteso altri 2 secondi:

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

Generazione di un event loop

In genere, viene realizzata completamente con una singola istruzione:

asyncio.run(main())  # Python 3.7+

ecco un modo più prolisso di gestire il ciclo di eventi asyncio, con get_event_loop(). Il modello tipico è simile a questo:

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

Task concorrenti correlati

È possibile attendere il completamento di una serie di più attività in esecuzione in modo asincrono, accedendo al valore restituito di ognuna. Se qualsiasi waitable nella lista è una coroutine, viene automaticamente pianificata come task. Se tutti gli elementi in attesa vengono completati correttamente, il risultato è un elenco aggregato di valori restituiti. L’ordine dei valori dei risultati corrisponde all’ordine degli elementi attendibili nella lista fornoita come parametro.

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2), currently i=2...
#     Task B: Compute factorial(3), currently i=2...
#     Task C: Compute factorial(4), currently i=2...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3), currently i=3...
#     Task C: Compute factorial(4), currently i=3...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4), currently i=4...
#     Task C: factorial(4) = 24
#     [2, 6, 24]

Una tipica app firmware

La maggior parte delle applicazioni firmware funziona ininterottamente per sempre. Ciò richiede che la coroutine del task sia passata a asyncio.run() e che dopo il main attende una await su una funzione non terminante (che giri per sempre).

Per facilitare il debug e per la compatibilità con CPython, nell’esempio seguente viene suggerito del codice “boilerplate”.

Per impostazione predefinita, un’eccezione in un’attività non interromperà l’esecuzione dell’intera applicazione. Questo può rendere difficile il debug.

È una cattiva pratica creare un’attività prima di eseguire asyncio.run(). CPython genererà un’eccezione in questo caso. MicroPython no, ma è saggio evitare di farlo.

Infine, uasyncio mantiene lo stato. Ciò significa che, per impostazione predefinita, è necessario riavviare tra le esecuzioni di un’applicazione. Questo può essere risolto con il metodo new_event_loop.

import uasyncio as asyncio
from my_app import MyClass

def set_global_exception():
    def handle_exception(loop, context):
        import sys
        sys.print_exception(context["exception"])
        sys.exit()
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)

async def main():
    set_global_exception()  # Debug aid
    my_class = MyClass()  # Constructor might create tasks
    asyncio.create_task(my_class.foo())  # Or you might do this
    await my_class.run_forever()  # Non-terminating method
try:
    asyncio.run(main())
finally:
    asyncio.new_event_loop()  # Clear retained state

Esempi

Di seguito è riportato un esempio di blink sequenziale in esecuzione su due task separati su scheda ESP32, con IDE Wokwi e con la libreria uasync.io. La programmazione sequenziale del blink del led è emulata tramite una funzione delay() non bloccante asyncio.sleep() fornita dalla libreria uasync.io.

import uasyncio
from machine import Pin

async def blink(led, period_ms):
    while True:
        led.on()
        await uasyncio.sleep_ms(period_ms)
        led.off()
        await uasyncio.sleep_ms(period_ms)

async def main(led1, led2):
    uasyncio.create_task(blink(led1, 1000))
    uasyncio.create_task(blink(led2, 2000))
    await uasyncio.sleep_ms(10000)
    print('Ending all tasks')
    led1.off()
    led2.off()
    
led1 = Pin(12,Pin.OUT)
led2 = Pin(18,Pin.OUT)

uasyncio.run(main(led1, led2))

Link simulazione online: https://wokwi.com/projects/369678530188573697

Stesso codice di prima ma con un loop infinito nella funzione main che garantisce la non terminazione del main. Inoltre nel loop principale si potrebbero eseguire altri task in parallelo al blink come la gestione di un input (ad es.un pulsante).

import uasyncio
from machine import Pin

async def blink(led, period_ms):
    while True:
        led.on()
        await uasyncio.sleep_ms(period_ms)
        led.off()
        await uasyncio.sleep_ms(period_ms)

async def main(led1, led2):
    uasyncio.create_task(blink(led1, 1000))
    uasyncio.create_task(blink(led2, 2000))
    while True:
        await uasyncio.sleep_ms(500)

led1 = Pin(12,Pin.OUT)
led2 = Pin(18,Pin.OUT)

uasyncio.run(main(led1, led2))

Link simulazione online: https://wokwi.com/projects/369680006454647809

Di seguito è riportato un esempio di un blink sequenziale in esecuzione su un task e di gestione del pulsante sul loop principale. I ritardi sleep agiscono sul task secondario ma non bloccano la lettura dello stato del pulsante che rimane responsivo nell’accendere il secondo led durante entrambe le fasi del blink del primo led.

import uasyncio
from machine import Pin

async def blink(led, period_ms):
    while True:
        led.on()
        await uasyncio.sleep_ms(period_ms)
        led.off()
        await uasyncio.sleep_ms(period_ms)

async def main(btn, led1, led2):
    uasyncio.create_task(blink(led2, 1000))
    while True:
        if btn.value():
            led1.on()
        else:
            led1.off()
        
        await uasyncio.sleep_ms(50)

btn = Pin(12,Pin.IN)
led1 = Pin(13,Pin.OUT)
led2 = Pin(18,Pin.OUT)

uasyncio.run(main(btn, led1, led2))

Link simulazione online: https://wokwi.com/projects/369680948206974977

In questo caso, il rilevatore dei fronti è realizzato campionando il valore del livello al loop di CPU attuale e confrontandolo con il valore del livello campionato nello stesso loop ma in un momento diverso stabilito mediante un istruzione waitUntilInputLow(). La funzione, di fatto, esegue un blocco del task corrente in “attesa” della soddisfazione di una certa condizione, senza bloccare l’esecuzione degli altri task. L’attesa è spesa campionando continuamente un ingresso fino a che questo non diventa LOW. Quando ciò accade allora vuol dire che si è rilevato un fronte di discesa per cui, qualora in futuro, in un loop successivo, si determinasse sullo stesso ingresso un valore HIGH, allora si può essere certi di essere in presenza di un fronte di salita.

La funzione

// attesa evento con tempo minimo di attesa
async def waitUntilInLow(btn,t):
    while btn.value()):
	 await asyncio.sleep(t)
 

realizza una funzione di wait su condizione che ritarda il thread corrente di un delay() prefissato al termine del quale ricalcola l’ingresso. L’operazione viene ripetuta fin tanto che la condizione attesa non è asserita. Si tratta di una funzione utile per due scopi:

Pulsante toggle che realizza blink e antirimbalzo realizzato con una schedulazione sequenziale con i ritardi emulati all’interno di task diversi su uno stesso thread. La libreria usata è quella nativa dello ESP32 uasync.io:

#Alla pressione del pulsante si attiva o disattiva il lampeggo di un led 
import uasyncio
from machine import Pin

#attesa evento con tempo minimo di attesa
async def waitUntilInLow(btn,t):
    while btn.value():
	    await uasyncio.sleep_ms(t)

async def toggle(index, btn, states):
    while True:
    	if btn.value():
            states[index] = not states[index]
            print(states[index])
            await waitUntilInLow(btn,50)
        else:
            await uasyncio.sleep_ms(10)

async def blink(led, period_ms):
    while True:
        if stati[0]:
            #print("on")
            led.on()
            await uasyncio.sleep_ms(period_ms)
            #print("off")
            led.off()
            await uasyncio.sleep_ms(period_ms)
        else:
            led.off()
            await uasyncio.sleep_ms(10)

async def main(btn, led, states):
    uasyncio.create_task(toggle(0, btn, states))
    uasyncio.create_task(blink(led, 1000))
    
    while True:       
        await uasyncio.sleep_ms(50)

btn1 = Pin(12,Pin.IN)
led1 = Pin(13,Pin.OUT)
stati = [False]  # variabile globale che memorizza lo stato del pulsante

uasyncio.run(main(btn1, led1, stati))

Link simulazione online: https://wokwi.com/projects/370370343319005185

Osservazioni:

Quando si tratta di sistemi embedded, il modello cooperativo presenta due vantaggi.

Sitografia:

Torna all’indice generazione tempi >Versione in C++