Skip to content

calgray/athreading

athreading

Test and build PyPI version PyPI python versions License Contributor Covenant

Code style Checked with mypy Pydocstyle Codecov

athreading is a Python library that allows you to run synchronous I/O functions asynchronously using asyncio via background threads. It provides decorators to adapt synchronous functions and generators, enabling them to operate without blocking the event loop.

Features

  • @athreading.call: Adapts a synchronous function into an asynchronous function.
  • @athreading.iterate_callback: Adapts a synchronous function using a callback (push-based stream) into an asynchronous iterator.
  • @athreading.iterate: Adapts a synchronous iterator (pull-based stream) into an asynchronous iterator.
  • @athreading.generate: Adapts a synchronous generator (pull-driven stream) into an asynchronous generator.

Note

Due to Python's Global Interpreter Lock (GIL), this library does not provide multi-threaded CPU parallelism unless using Python 3.9 with nogil or Python 3.13 with free threading enabled.

Installation

athreading can be installed using pip:

pip install athreading

Usage

athreading enables running synchronous functions and iterators asynchronously using asyncio.

1. Adapt a synchronous function

The @athreading.call decorator transforms a synchronous function into an asynchronous function.

>>> import athreading
>>> import time
>>> import math
>>> import asyncio
>>>
>>> @athreading.call
... def compute_sqrt(x):
...     time.sleep(0.05)  # Simulate a blocking I/O operation
...     return math.sqrt(x)
...
>>> async def amain():
...     results = await asyncio.gather(
...         compute_sqrt(2),
...         compute_sqrt(3),
...         compute_sqrt(4)
...     )
...     print(results)

>>> asyncio.run(amain())
[1.4142135623730951, 1.7320508075688772, 2.0]

In this example, compute_sqrt is a synchronous function that sleeps for 0.5 seconds to simulate a blocking I/O operation. By decorating it with @athreading.call, it can be awaited within an asynchronous context, allowing multiple calls to run concurrently without blocking the event loop.

2. Adapt a synchronous function with callback (push-based stream)

The @athreading.iterate_callback decorator transforms a synchronous function using a callback into an asynchronous iterator.

>>> import athreading
>>> import time
>>> import datetime
>>> import asyncio
>>>
>>> @athreading.iterate_callback
... def time_generator(callback, n):
...     for value in range(n):
...         time.sleep(0.05)  # Simulate a blocking I/O operation
...         callback(value)
...
>>> async def aprint_stream(label):
...     async with time_generator(n=10) as stream:
...         async for current_time in stream:
...             print(f"{label}: {current_time}")
...
>>> async def amain():
...
...     await asyncio.gather(
...         aprint_stream("Stream 1"),
...         aprint_stream("Stream 2"),
...         aprint_stream("Stream 3"),
...     )
...
>>> asyncio.run(amain())  # doctest: +ELLIPSIS
Stream ...

3. Adapt a synchronous iterator (pull-based stream)

The @athreading.iterate decorator transforms a synchronous iterator into an asynchronous iterator.

>>> import athreading
>>> import time
>>> import datetime
>>> import asyncio
>>>
>>> @athreading.iterate
... def time_generator(n):
...     for _ in range(n):
...         time.sleep(0.05)  # Simulate a blocking I/O operation
...         yield datetime.datetime.now()
...
>>> async def print_stream(label):
...     async with time_generator(10) as stream:
...         async for current_time in stream:
...             print(f"{label}: {current_time}")
...
>>> async def amain():
...     await asyncio.gather(
...         print_stream("Stream 1"),
...         print_stream("Stream 2"),
...         print_stream("Stream 3"),
...     )
...
>>> asyncio.run(amain())  # doctest: +ELLIPSIS
Stream ...

This example demonstrates running three asynchronous streams concurrently. Each stream processes the time_generator function independently, and the decorator ensures iteration occurs without blocking the event loop.

4. Adapt a synchronous generator (push-and-pull-based stream)

The @athreading.generate decorator converts a synchronous generator function into an asynchronous generator function that supports asend.

>>> import athreading
>>> import time
>>> import asyncio
>>>
>>> @athreading.generate
... def controlled_counter(start, step):
...     current = start
...     while True:
...         time.sleep(0.5)  # Simulate a blocking I/O operation
...         received = yield current
...         current = received if received is not None else current + step
...
>>> async def amain():
...     async with controlled_counter(0, 1) as async_gen:
...         print(await async_gen.asend(None))  # Start the generator
...         print(await async_gen.asend(None))  # Advance with default step
...         print(await async_gen.asend(10))   # Send a new value to control the counter
...         print(await async_gen.asend(None))  # Continue from the new value
...
>>> asyncio.run(amain())
0
1
10
11

This example demonstrates how @athreading.generate transforms a synchronous generator into an asynchronous generator. The asend method sends values to control the generator's state dynamically, enabling interactive workflows while avoiding blocking the event loop.

License

This project is licensed under the BSD-3-Clause License.

For more information and examples, please visit the athreading GitHub repository.

About

Run synchronous I/O functions and generators asynchronously using background threads

Topics

Resources

License

Code of conduct

Contributing

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •  

Languages