asyncio Beispiel

Ab IPython≥7.0 könnt ihr asyncio direkt in Jupyter Notebooks verwenden; seht auch IPython 7.0, Async REPL.

Wenn ihr die Fehlermeldung RuntimeError: This event loop is already running erhaltet, hilft euch vielleicht [nest-asyncio] weiter.

Ihr könnt das Paket in eurer Jupyter- oder JupyterHub-Umgebung installieren mit

$ uv add nest-asyncio

Ihr könnt es dann in euer Notebook importieren und verwenden mit:

[1]:
import nest_asyncio


nest_asyncio.apply()

Zum Weiterlesen

Einfaches Hello world-Beispiel

[2]:
import asyncio


async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("world")


await hello()
Hello
world

Ein bisschen näher an einem realen Beispiel

[3]:
import random


async def publish(queue, n):
    for x in range(1, n + 1):
        # publish an item
        print(f"Publishing {x}/{n}")
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = str(x)
        # put the item in the queue
        await queue.put(item)

    # indicate the publisher is done
    await queue.put(None)


async def consume(queue):
    while True:
        # wait for an item from the publisher
        item = await queue.get()
        if item is None:
            # the publisher emits None to indicate that it is done
            break

        # process the item
        print(f"consuming {item}")
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())


background_tasks = set()
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
publishing = asyncio.ensure_future(publish(queue, 10), loop=loop)
background_tasks.add(publishing)
publishing.add_done_callback(background_tasks.discard)
loop.run_until_complete(consume(queue))
Publishing 1/10
Publishing 2/10
consuming 1
Publishing 3/10
consuming 2
Publishing 4/10
Publishing 5/10
consuming 3
consuming 4
Publishing 6/10
Publishing 7/10
consuming 5
Publishing 8/10
consuming 6
Publishing 9/10
Publishing 10/10
consuming 7
consuming 8
consuming 9
consuming 10

Ausnahmebehandlung

[4]:
import logging
import signal


logger = logging.getLogger("stream_logger")


def handle_exception(context):
    msg = context.get("Exception", context["message"])
    logger.error(f"Caught exception: {msg}")
    logger.info("Shutting down…")


def main():
    loop = asyncio.get_event_loop()
    # May want to catch other signals too
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s,
            lambda s=s: asyncio.create_task(loop, signal=s),
        )
    loop.set_exception_handler(handle_exception)
    queue = asyncio.Queue()
    try:
        publish_task = loop.create_task(publish(queue))
        background_tasks.add(publish_task)
        publish_task.add_done_callback(background_tasks.discard)
        consume_task = loop.create_task(consume(queue))
        background_tasks.add(consume_task)
        consume_task.add_done_callback(background_tasks.discard)
        loop.run_forever()
    finally:
        loop.close()

Testen mit pytest

Beispiel:

Beim Testen muss man häufig Coroutinen simulieren, die innerhalb der zu testenden Funktion aufgerufen werden.

Dafür benötigen wir

Da die Bibliothek pytest-mock jedoch keine asynchronen Mocks unterstützt, müssen wir eine Umgehungslösung finden:

[5]:
import asyncio

import pytest


@pytest.fixture
def mock_coroutine(mocker, monkeypatch):

    def _mock_coroutine_pair(to_patch=None):
        mock = mocker.Mock()

        async def coroutine(*args, **kwargs):
            return mock(*args, **kwargs)

        if to_patch:
            monkeypatch.setattr(to_patch, _mock_coroutine_pair)

        return mock, _mock_coroutine_pair

    return _mock_coroutine_pair

Und für asyncio.Queue habe ich folgende Fixtures:

[6]:
@pytest.fixture
def mock_queue(mocker, monkeypatch):
    queue = mocker.Mock()
    monkeypatch.setattr(asyncio, "Queue", queue)
    return queue.return_value


@pytest.fixture
def mock_get(mock_coroutine):
    mock_get, _ = mock_coroutine()
    return mock_get
[7]:
@pytest.mark.asyncio
async def test_consume(mock_get, mock_queue):
    mock_get.side_effect = Exception("Break while loop")

    with pytest.raises(Exception, match="Break while loop"):
        await consume(mock_queue)

Bibliotheken von Drittanbietern

  • pytest-asyncio hat hilfreiche Dinge wie Test-Fixtures für event_loop, unused_tcp_port, und unused_tcp_port_factory; und die Möglichkeit zum Erstellen eurer eigenen asynchronen Fixtures.

  • asynctest verfügt über hilfreiche Werkzeuge, einschließlich Coroutine-Mocks und exhaust_callbacks so dass wir await task nicht manuell erstellen müssen.

  • aiohttp hat ein paar wirklich nette eingebaute Test-Utilities.

Debugging

asyncio hat bereits einen debug mode in der Standardbibliothek. Ihr könnt ihn einfach mit der Umgebungsvariablen PYTHONASYNCIODEBUG oder im Code mit loop.set_debug(True) aktivieren.

Verwendet den Debug-Modus zum Identifizieren langsamer asynchroner Aufrufe

Der Debug-Modus von asyncio hat einen kleinen eingebauten Profiler. Wenn der Debug-Modus aktiviert ist, protokolliert asyncio alle asynchronen Aufrufe, die länger als 100 Millisekunden dauern.

Debugging im Produktivbetrieb mit aiodebug

aiodebug ist eine kleine Bibliothek zum Überwachen und Testen von Asyncio-Programmen.

Beispiel

[8]:
import aiodebug.log_slow_callbacks

from aiodebug.logging_compat import get_logger


logger = get_logger(__name__)

aiodebug.log_slow_callbacks.enable(
    0.05,
    on_slow_callback=lambda task_name, duration: logger.warning(
        "Task blocked async loop for too long",
        extra={"task_name": task_name, "duration": duration},
    ),
)

Logging

aiologger ermöglicht eine nicht-blockierendes Logging.

Asynchrone Widgets

Siehe auch

[9]:
def wait_for_change(widget, value):
    future = asyncio.Future()

    def getvalue(change):
        # make the new value available
        future.set_result(change.new)
        widget.unobserve(getvalue, value)

    widget.observe(getvalue, value)
    return future
[10]:
from ipywidgets import IntSlider


slider = IntSlider()


async def f():
    for i in range(10):
        print(f"did work {i}")
        x = await wait_for_change(slider, "value")
        print(f"async function continued with value {x}")


task = asyncio.ensure_future(f())
background_tasks = set()

background_tasks.add(task)

task.add_done_callback(background_tasks.discard)

slider
[10]:
did work 0
async function continued with value 2
did work 1
async function continued with value 4
did work 2
async function continued with value 8
did work 3
async function continued with value 14
did work 4
async function continued with value 23
did work 5
async function continued with value 33
did work 6
async function continued with value 44
did work 7
async function continued with value 58
did work 8
async function continued with value 73
did work 9
async function continued with value 88