I've been working on AsyncFast, a Python framework for building message-driven APIs with FastAPI-style ergonomics.
The goal is to let you write handlers once and run them unchanged across different brokers and runtimes (Kafka, SQS, MQTT, Redis, containers, Lambda, etc.).
Under the hood it's built on a small spec (AMGI) that plays a similar role to ASGI, but for message-based systems instead of HTTP.
Minimal example
from asyncfast import AsyncFast
from pydantic import BaseModel
app = AsyncFast()
class Payload(BaseModel):
id: str
name: str
@app.channel("channel")
async def handle_channel(payload: Payload) -> None:
print(payload)
Running it locally (Kafka example) pip install asyncfast amgi-aiokafka
asyncfast run app:app kafka --bootstrap-servers localhost:9092
The same application code can be run against different brokers by swapping the runtime adapter.Key ideas:
- broker-agnostic application code
- explicit, typed address parameters / payloads / headers
- portability between long-running consumers and serverless environments
Docs: https://asyncfast.readthedocs.io
Source: https://github.com/asyncfast/amgi
AMGI: https://amgi.readthedocs.io
This is still early and evolving - I'd be especially interested in feedback from people who've built Kafka or event-driven systems and had to deal with portability, or tight broker coupling issues.