asyncio example¶
From IPython≥7.0 you can use asyncio directly in Jupyter Notebooks, see also IPython 7.0, Async REPL.
If you get RuntimeError: This event loop is already running, [nest-asyncio] might help you.
You can install the package with
$ uv add nest-asyncio
You can then import it into your notebook and use it with:
[1]:
import nest_asyncio
nest_asyncio.apply()
See also:
asyncio: We Did It Wrong by Lynn Root
An Intro to asyncio by Mike Driscoll
Asyncio Coroutine Patterns: Beyond await by Yeray Diaz
Simple Hello world example¶
[2]:
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("world")
await hello()
Hello
world
A little bit closer to a real world example¶
[3]:
import random
async def publish(queue, n):
for x in range(1, n + 1):
# publish an item
print(f"Publishing {x}/{n}")
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
item = str(x)
# put the item in the queue
await queue.put(item)
# indicate the publisher is done
await queue.put(None)
async def consume(queue):
while True:
# wait for an item from the publisher
item = await queue.get()
if item is None:
# the publisher emits None to indicate that it is done
break
# process the item
print(f"consuming {item}")
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
background_tasks = set()
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
publishing = asyncio.ensure_future(publish(queue, 10), loop=loop)
background_tasks.add(publishing)
publishing.add_done_callback(background_tasks.discard)
loop.run_until_complete(consume(queue))
Publishing 1/10
Publishing 2/10
consuming 1
Publishing 3/10
consuming 2
Publishing 4/10
consuming 3
Publishing 5/10
consuming 4
Publishing 6/10
consuming 5
Publishing 7/10
consuming 6
Publishing 8/10
Publishing 9/10
consuming 7
Publishing 10/10
consuming 8
consuming 9
consuming 10
Exception Handling¶
See also:
[4]:
import logging
import signal
logger = logging.getLogger("stream_logger")
def handle_exception(context):
msg = context.get("Exception", context["message"])
logger.error(f"Caught exception: {msg}")
logger.info("Shutting down…")
def main():
loop = asyncio.get_event_loop()
# May want to catch other signals too
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s,
lambda s=s: asyncio.create_task(loop, signal=s),
)
loop.set_exception_handler(handle_exception)
queue = asyncio.Queue()
try:
publish_task = loop.create_task(publish(queue))
background_tasks.add(publish_task)
publish_task.add_done_callback(background_tasks.discard)
consume_task = loop.create_task(consume(queue))
background_tasks.add(consume_task)
consume_task.add_done_callback(background_tasks.discard)
loop.run_forever()
finally:
loop.close()
Testing with pytest¶
Example:¶
When testing, you often need to simulate coroutines that are called within the function you are testing.
For this, we need
However, the pytest-mock library does not support asynchronous mocks, so we need to find a workaround:
[5]:
import asyncio
import pytest
@pytest.fixture
def mock_coroutine(mocker, monkeypatch):
def _mock_coroutine_pair(to_patch=None):
mock = mocker.Mock()
async def coroutine(*args, **kwargs):
return mock(*args, **kwargs)
if to_patch:
monkeypatch.setattr(to_patch, _mock_coroutine_pair)
return mock, _mock_coroutine_pair
return _mock_coroutine_pair
And for asyncio.Queue, I have the following fixtures:
[6]:
@pytest.fixture
def mock_queue(mocker, monkeypatch):
queue = mocker.Mock()
monkeypatch.setattr(asyncio, "Queue", queue)
return queue.return_value
@pytest.fixture
def mock_get(mock_coroutine):
mock_get, _ = mock_coroutine()
return mock_get
[7]:
@pytest.mark.asyncio
async def test_consume(mock_get, mock_queue):
mock_get.side_effect = Exception("Break while loop")
with pytest.raises(Exception, match="Break while loop"):
await consume(mock_queue)
Third-party libraries¶
pytest-asyncio has helpful things like fixtures for
event_loop,unused_tcp_port, andunused_tcp_port_factory; and the ability to create your own asynchronous fixtures.asynctest has helpful tooling, including coroutine mocks and exhaust_callbacks so we don’t have to manually await tasks.
aiohttp has some really nice built-in test utilities.
Debugging¶
asyncio already has a debug mode in the standard library. You can simply activate it with the PYTHONASYNCIODEBUG environment variable or in the code with loop.set_debug(True).
Using the debug mode to identify slow async calls¶
asyncio’s debug mode has a tiny built-in profiler. When debug mode is on, asyncio will log any asynchronous calls that take longer than 100 milliseconds.
Debugging in production with aiodebug¶
aiodebug is a tiny library for monitoring and testing asyncio programs.
Example¶
[8]:
import aiodebug.log_slow_callbacks
from aiodebug.logging_compat import get_logger
logger = get_logger(__name__)
aiodebug.log_slow_callbacks.enable(
0.05,
on_slow_callback=lambda task_name, duration: logger.warning(
"Task blocked async loop for too long",
extra={"task_name": task_name, "duration": duration},
),
)
Logging¶
aiologger allows non-blocking logging.
Asynchronous Widgets¶
See also:
[9]:
def wait_for_change(widget, value):
future = asyncio.Future()
def getvalue(change):
# make the new value available
future.set_result(change.new)
widget.unobserve(getvalue, value)
widget.observe(getvalue, value)
return future
[10]:
from ipywidgets import IntSlider
slider = IntSlider()
async def f():
for i in range(10):
print(f"did work {i}")
x = await wait_for_change(slider, "value")
print(f"async function continued with value {x}")
task = asyncio.ensure_future(f())
background_tasks = set()
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
slider
[10]:
did work 0
async function continued with value 1
did work 1
async function continued with value 3
did work 2
async function continued with value 7
did work 3
async function continued with value 12
did work 4
async function continued with value 21
did work 5
async function continued with value 31
did work 6
async function continued with value 43
did work 7
async function continued with value 55
did work 8
async function continued with value 70
did work 9
async function continued with value 87