TonIO is a multi-threaded async runtime for free-threaded Python, built in Rust on top of the mio crate, and inspired by tinyio, trio and tokio.
Warning: TonIO is currently a work in progress and in alpha state. The APIs are subtle to breaking changes.
Note: TonIO is available on free-threaded Python and Unix systems only.
TonIO supports both using yield and the more canonical async/await notations, with the latter being available as part of the tonio.colored module. Following code snippets show both the usages.
Warning: despite the fact TonIO supports
asyncandawaitnotations, it's not compatible with anyasyncioobject like futures and tasks.
|
import tonio
def wait_and_add(x: int) -> int:
yield tonio.sleep(1)
return x + 1
def foo():
four, five = yield tonio.spawn(
wait_and_add(3),
wait_and_add(4)
)
return four, five
out = tonio.run(foo())
assert out == (4, 5) |
import tonio.colored as tonio
async def wait_and_add(x: int) -> int:
await tonio.sleep(1)
return x + 1
async def foo():
four, five = await tonio.spawn(
wait_and_add(3),
wait_and_add(4)
)
return four, five
out = tonio.run(foo())
assert out == (4, 5) |
Every TonIO program consist of an entrypoint, which should be passed to the run method:
|
import tonio
def main():
yield
print("Hello world")
tonio.run(main()) |
import tonio.colored as tonio
async def main():
await tonio.yield_now()
print("Hellow world")
tonio.run(main()) |
TonIO also provides a main decorator, thus we can rewrite the previous example as:
|
import tonio
@tonio.main
def main():
yield
print("Hello world")
main() |
import tonio.colored as tonio
@tonio.main
async def main():
await tonio.yield_now()
print("Hello world")
main() |
Note: as you can see the
coloredmodule provides the additionalyield_nowcoroutine, a quick way to define a suspension point, given you cannot justyieldas in the non-colored notation.
Note: both
runandmaincan only be called once per program. To run the runtime multiple times in the same program, follow the section below.
TonIO also provides the runtime function, to manually manage the runtime lifecycle:
import tonio
def _run1():
...
async def _run2():
...
def main():
runtime = tonio.runtime()
runtime.run_until_complete(_run1())
runtime.run_until_complete(_run2())The run, main and runtime methods accept options, specifically:
| option name | description | default |
|---|---|---|
context |
enable contextvars usage in coroutines |
False |
signals |
list of signals to listen to | |
threads |
Number of runtime threads | # of CPU cores |
blocking_threadpool_size |
Maximum number of blocking threads | 128 |
blocking_threadpool_idle_ttl |
Idle timeout for blocking threads (in seconds) | 30 |
The core object in TonIO is Event. It's basically a wrapper around an atomic boolean flag, initialised with False. Event provides the following methods:
is_set(): return the value of the flagset(): set the flag toTrueclear(): set the flag toFalsewait(timeout=None): returns a coroutine you can yield on that unblocks when the flag is set toTrueor the timeout expires. Timeout is in seconds.
|
import tonio
@tonio.main
def main():
event = tonio.Event()
def setter():
yield tonio.sleep(1)
event.set()
tonio.spawn(setter())
yield event.wait() |
import tonio.colored as tonio
@tonio.main
async def main():
event = tonio.Event()
async def setter():
await tonio.sleep(1)
event.set()
tonio.spawn(setter())
await event.wait() |
TonIO provides the spawn method to schedule new coroutines onto the runtime:
|
import tonio
def doubv(v):
yield
return v * 2
@tonio.main
def main():
parallel = tonio.spawn(doubv(2), doubv(3))
v3 = yield doubv(4)
v1, v2 = yield parallel
print([v1, v2, v3]) |
import tonio.colored as tonio
async def doubv(v):
await tonio.yield_now()
return v * 2
@tonio.main
async def main():
parallel = tonio.spawn(doubv(2), doubv(3))
v3 = await doubv(4)
v1, v2 = await parallel
print([v1, v2, v3]) |
Coroutines passed to spawn get schedule onto the runtime immediately. Using yield or await on the return value of spawn just waits for the coroutines to complete and retreive the results.
TonIO provides the spawn_blocking method to schedule blocking operations onto the runtime:
|
import tonio
def read_file(path):
with open(file, "r") as f:
return f.read()
@tonio.main
def main():
file_data = yield tonio.spawn_blocking(
read_file,
"sometext.txt"
) |
import tonio.colored as tonio
def read_file(path):
with open(file, "r") as f:
return f.read()
@tonio.main
async def main():
file_data = await tonio.spawn_blocking(
read_file,
"sometext.txt"
) |
TonIO provides the map and map_blocking utilities to spawn the same operation with an iterable of parameters:
|
import tonio
accum = []
def task(no):
yield tonio.sleep(0.5)
accum.append(no * 2)
@tonio.main
def main():
yield tonio.map(task, range(4)) |
import tonio.colored as tonio
accum = []
async def task(no):
await tonio.sleep(0.5)
accum.append(no * 2)
@tonio.main
async def main():
await tonio.map(task, range(4)) |
TonIO provides a scope context, that lets you cancel work spawned within it:
|
import tonio
def slow_push(target, sleep):
yield tonio.sleep(sleep)
target.append(True)
@tonio.main
def main():
values = []
with tonio.scope() as scope:
scope.spawn(_slow_push(values, 0.1))
scope.spawn(_slow_push(values, 2))
yield tonio.sleep(0.2)
scope.cancel()
yield scope()
assert len(values) == 1 |
import tonio.colored as tonio
async def slow_push(target, sleep):
await tonio.sleep(sleep)
target.append(True)
@tonio.main
async def main():
values = []
async with tonio.scope() as scope:
scope.spawn(_slow_push(values, 0.1))
scope.spawn(_slow_push(values, 2))
await tonio.sleep(0.2)
scope.cancel()
assert len(values) == 1 |
When you yield on the scope, it will wait for all the spawned coroutines to end. If the scope was canceled, then all the pending coroutines will be canceled.
Note: as you can see, the colored version of
scopedoesn't require to beawaited, as it will yield when exiting the context.
tonio.time.time(): a function returning the runtime's clock (in seconds, microsecond resolution)tonio.time.sleep(delay): a coroutine you can yield on to sleep (delay is in seconds)tonio.time.timeout(coro, timeout): a coroutine you can yield on returning a tuple(output, success). If the coroutine succeeds in the given time then the pair(output, True)is returned. Otherwise this will return(None, False).
Note:
time.sleepis also exported to the maintoniomodule.
Note: all of the above functions are also present in
tonio.colored.timemodule.
TonIO provides the time.interval function to create interval objects you can yield on a scheduled basis:
|
import tonio
from tonio import time
def some_task():
...
def scheduler():
interval = time.interval(1)
while True:
yield interval.tick()
tonio.spawn(some_task())
@tonio.main
def main():
tonio.spawn(scheduler())
# do some other work |
import tonio.colored as tonio
from tonio.colored import time
async def some_task():
...
async def scheduler():
interval = time.interval(1)
while True:
await interval.tick()
tonio.spawn(some_task())
@tonio.main
async def main():
tonio.spawn(scheduler())
# do some other work |
The interval method first argument is the interval in seconds resolution, and the method also accepts an optional at argument, to delay the first execution at a specific time (from the runtime's clock perspective):
from tonio import time
# tick every 500ms, with the first tick happening in 5 seconds from now
interval = time.interval(0.5, time.time() + 5)Synchronization primitives are exposed in the tonio.sync module.
Implements a classic mutex, or a non-reentrant, single-owner lock for coroutines:
|
import tonio
from tonio import sync
@tonio.main
def main():
# counter can't go above 1
counter = 0
def _count(lock):
nonlocal counter
with (yield lock()):
counter += 1
yield
counter -= 1
lock = sync.Lock()
yield tonio.spawn(*[
_count(lock)
for _ in range(10)
]) |
import tonio.colored as tonio
from tonio.colored import sync
@tonio.main
async def main():
# counter can't go above 1
counter = 0
async def _count(lock):
nonlocal counter
async with lock:
counter += 1
await tonio.yield_now()
counter -= 1
lock = sync.Lock()
await tonio.spawn(*[
_count(lock)
for _ in range(10)
]) |
The Lock object also implements an or_raise method, that will immediately fail when the lock cannot be acquired:
from tonio.exceptions import WouldBlock
try:
with lock.or_raise():
...
except WouldBlock:
...A semaphore for coroutines:
|
import tonio
from tonio import sync
@tonio.main
def main():
# counter can't go above 2
counter = 0
def _count(semaphore):
nonlocal counter
with (yield semaphore()):
counter += 1
yield
counter -= 1
semaphore = sync.Semaphore(2)
yield tonio.spawn(*[
_count(semaphore)
for _ in range(10)
]) |
import tonio.colored as tonio
from tonio.colored import sync
@tonio.main
async def main():
# counter can't go above 2
counter = 0
async def _count(semaphore):
nonlocal counter
async with semaphore:
counter += 1
await tonio.yield_now()
counter -= 1
semaphore = sync.Semaphore(2)
await tonio.spawn(*[
_count(semaphore)
for _ in range(10)
]) |
As for locks, the Semaphore object also implements an or_raise method, that will immediately fail when the lock cannot be acquired:
from tonio.exceptions import WouldBlock
try:
with semaphore.or_raise():
...
except WouldBlock:
...The Semaphore object also implements a tokens method, that returns the number of available tokens.
A barrier for coroutines:
|
import tonio
from tonio import sync
@tonio.main
def main():
barrier = sync.Barrier(3)
count = 0
def _start_at_3():
nonlocal count
count += 1
i = yield barrier.wait()
assert count == 3
return i
yield tonio.spawn(*[
_start_at_3()
for _ in range(3)
]) |
import tonio.colored as tonio
from tonio.colored import sync
@tonio.main
async def main():
barrier = sync.Barrier(3)
count = 0
async def _start_at_3():
nonlocal count
count += 1
i = await barrier.wait()
assert count == 3
return i
await tonio.spawn(*[
_start_at_3()
for _ in range(3)
]) |
The Barrier object also implements a value method, which returns the current value of the barrier.
Multi-producer multi-consumer channels for inter-coroutine communication.
The tonio.sync.channel module provides both a channel and an unbounded constructors.
The main difference between bounded and unbounded channels, as the names suggest, is that while the first will suspend sending messages once the specified length is reached, and it will resume accepting messages once the existing buffer is consumed, the latter will always accept new messages. That's also why, the sender part of a bounded channel is async, while in the unbounded is not.
|
import tonio
from tonio import sync
from tonio.sync import channel
def producer(sender, barrier, offset):
for i in range(20):
message = offset + 1
yield sender.send(message)
yield barrier.wait()
def consumer(receiver):
while True:
try:
message = yield receiver.receive()
print(message)
except Exception:
break
@tonio.main
def main():
def close(sender, barrier):
yield barrier.wait()
sender.close()
sender, receiver = channel.channel(2)
barrier = sync.Barrier(3)
yield tonio.spawn(*[
producer(sender, barrier, 100),
producer(sender, barrier, 200),
consumer(receiver),
consumer(receiver),
consumer(receiver),
consumer(receiver),
close(sender, barrier),
]) |
import tonio.colored as tonio
from tonio.colored import sync
from tonio.colored.sync import channel
async def producer(sender, barrier, offset):
for i in range(20):
message = offset + 1
await sender.send(message)
await barrier.wait()
async def consumer(receiver):
while True:
try:
message = await receiver.receive()
print(message)
except Exception:
break
@tonio.main
async def main():
async def close(sender, barrier):
await barrier.wait()
sender.close()
sender, receiver = channel.channel(2)
barrier = sync.Barrier(3)
await tonio.spawn(*[
producer(sender, barrier, 100),
producer(sender, barrier, 200),
consumer(receiver),
consumer(receiver),
consumer(receiver),
consumer(receiver),
close(sender, barrier),
]) |
|
import tonio
from tonio import sync
from tonio.sync import channel
def producer(sender, barrier, offset):
for i in range(20):
message = offset + 1
sender.send(message)
yield barrier.wait()
def consumer(receiver):
while True:
try:
message = yield receiver.receive()
print(message)
except Exception:
break
@tonio.main
def main():
def close(sender, barrier):
yield barrier.wait()
sender.close()
sender, receiver = channel.unbounded()
barrier = sync.Barrier(3)
yield tonio.spawn(*[
producer(sender, barrier, 100),
producer(sender, barrier, 200),
consumer(receiver),
consumer(receiver),
consumer(receiver),
consumer(receiver),
close(sender, barrier),
]) |
import tonio.colored as tonio
from tonio.colored import sync
from tonio.colored.sync import channel
async def producer(sender, barrier, offset):
for i in range(20):
message = offset + 1
sender.send(message)
await barrier.wait()
async def consumer(receiver):
while True:
try:
message = await receiver.receive()
print(message)
except Exception:
break
@tonio.main
async def main():
async def close(sender, barrier):
await barrier.wait()
sender.close()
sender, receiver = channel.unbounded()
barrier = sync.Barrier(3)
await tonio.spawn(*[
producer(sender, barrier, 100),
producer(sender, barrier, 200),
consumer(receiver),
consumer(receiver),
consumer(receiver),
consumer(receiver),
close(sender, barrier),
]) |
Network primitives are exposed under the tonio.net module.
The high-level network primitives in TonIO are centered aroud the SocketStream and SocketListener objects.
The SocketListener object implements an accept coroutine which returns a SocketStream object.
The SocketStream object implements the send_all and receive_some coroutines to send and receive data.
Both objects implement a close method to shutdown the underlying socket.
You can create and interact with the above objects using some high-level helpers in the net module, specifically:
open_tcp_stream: a coroutine to open aSocketStreamconnected to a TCP endpointopen_unix_socket: a coroutine to open aSocketStreamconnected to an Unix socketopen_tcp_listeners: a coroutine to initialiseSocketListenerobjectsserve_listeners: a coroutine to spawnSocketListeneraccept loops targeting a handlerserve_tcp: a coroutine that joinopen_tcp_listenerandserve_listenersin one call
|
from tonio.net import open_tcp_stream, serve_tcp
def server():
yield serve_tcp(
server_handle,
host='127.0.0.1',
port=8000
)
def server_handle(stream):
# receive some data
data = yield stream.receive_some()
def client():
stream = yield open_tcp_stream(
host='127.0.0.1',
port=8000
)
# send some data
yield stream.send_all("message") |
from tonio.colored.net import open_tcp_stream, serve_tcp
async def server():
await serve_tcp(
server_handle,
host='127.0.0.1',
port=8000
)
async def server_handle(stream):
# receive some data
data = await stream.receive_some()
async def client():
stream = await open_tcp_stream(
host='127.0.0.1',
port=8000
)
# send some data
await stream.send_all("message") |
TonIO implement SSL wrappers around the streaming APIs through primitives in the tonio.net.ssl module.
TonIO provides the SSLStream and SSLListener object wrappers and the following high-level helpers:
open_ssl_over_tcp_stream: a coroutine to open aSSLStreamwrapping a TCPSocketStreamopen_ssl_over_tcp_listeners: a coroutine to initialiseSSLListenerobjectsserve_ssl_over_tcp: a coroutine that joinopen_tls_over_tcp_listenersandserve_listenersin one call
The tonio.net.socket module provides TonIO's basic low-level networking API.
Generally, the API exposed by this module mirrors the standard library socket module.
TonIO socket objects are overall very similar to the standard library socket objects, with the main difference being that blocking methods become coroutines.
|
import tonio
from tonio.net import socket
def server():
sock = socket.socket()
with sock:
yield sock.bind(('127.0.0.1', 8000))
sock.listen()
while True:
client, _ = yield sock.accept()
tonio.spawn(server_handle(client))
def server_handle(connection):
with connection:
# receive some data
data = yield connection.recv(4096)
def client():
sock = socket.socket()
with sock:
yield sock.connect(('127.0.0.1', 8000))
yield sock.send("message") |
import tonio.colored as tonio
from tonio.colored.net import socket
async def server():
sock = socket.socket()
with sock:
await sock.bind(('127.0.0.1', 8000))
sock.listen()
while True:
client, _ = await sock.accept()
tonio.spawn(server_handle(client))
async def server_handle(connection):
with connection:
# receive some data
data = await connection.recv(4096)
async def client():
sock = socket.socket()
with sock:
await sock.connect(('127.0.0.1', 8000))
await sock.send("message") |
TonIO provides a context manager to catch signals.
The usage of such context manager requires to first configure the runtime to listen for such signals:
|
import signal
import tonio
from tonio.time import interval
def sig_handle():
with tonio.signal_receiver(
signal.SIGHUP,
signal.SIGUSR1
) as sigs:
for ev in sigs:
sig = yield ev
if sig == signal.SIGHUP:
...
@tonio.main(
signals=[signal.SIGHUP, signal.SIGUSR1]
)
def main():
tonio.spawn(sig_handle())
ticker = interval(1)
while True:
yield ticker.tick() |
import signal
import tonio.colored as tonio
from tonio.colored.time import interval
async def sig_handle():
with tonio.signal_receiver(
signal.SIGHUP,
signal.SIGUSR1
) as sigs:
async for sig in sigs:
if sig == signal.SIGHUP:
...
@tonio.main(
signals=[signal.SIGHUP, signal.SIGUSR1]
)
async def main():
tonio.spawn(sig_handle())
ticker = interval(1)
while True:
await ticker.tick() |
TonIO is released under the BSD License.