Event Bus
FastPluggyEventBus is a lightweight, thread-safe, string-keyed publish/subscribe bus built into
FastPluggy. Plugins communicate through it without knowing each other exist.
Access it via fast_pluggy.events (in hooks) or request.app.state.events (in route handlers).
Subscribing to events
In on_load_complete
from fastpluggy.core.events import Event
from fastpluggy.core.module_base import FastPluggyBaseModule
class MyPlugin(FastPluggyBaseModule):
module_name = "my_plugin"
def on_load_complete(self, fast_pluggy):
fast_pluggy.events.on("user.login", self._on_login)
def _on_login(self, event: Event):
print(f"{event.payload['username']} logged in from {event.payload['ip']}")
The decorator form is also valid:
def on_load_complete(self, fast_pluggy):
@fast_pluggy.events.on("user.login")
def handle(event: Event):
...
Options
| Parameter | Type | Default | Description |
|---|---|---|---|
priority |
int |
0 |
Higher runs first |
once |
bool |
False |
Auto-unsubscribes after first fire |
predicate |
Callable[[Event], bool] |
None |
Only fires when predicate returns True |
fast_pluggy.events.on("user.login", handler, priority=10, once=True)
fast_pluggy.events.on("user.login", handler, predicate=lambda e: e.payload.get("ip") != "127.0.0.1")
Emitting events
# From a route handler
request.app.state.events.emit("user.login", source="auth_user",
user_id=user.id, username=user.username,
ip=request.client.host, remember=True)
# From a plugin method
fast_pluggy.events.emit("report.generated", source=self.module_name, report_id=report.id)
All keyword arguments after source become event.payload.
Namespace listeners
Subscribe to all events sharing a prefix with on_namespace:
# Fires for any event whose name starts with "user."
fast_pluggy.events.on_namespace("user", self._audit)
def _audit(self, event: Event):
print(f"[audit] {event.name} — {event.payload}")
Unsubscribe with off_namespace(prefix, fn).
Unsubscribing
Async listeners
Listeners can be async def. The bus dispatches them correctly in both contexts:
- Inside uvicorn (FastAPI request cycle): scheduled via
loop.create_task()— non-blocking - Worker thread (e.g. background jobs): executed via
asyncio.run()— blocks the thread until done
Temporary subscriptions (tests)
Use the subscribed context manager to register and auto-clean a listener:
with fast_pluggy.events.subscribed("user.login", handler):
fast_pluggy.events.emit("user.login", source="test")
# listener removed here
Declaring emitted events (EventSpec)
Plugins document the events they emit via the emits field. These appear in the admin
Events page as a browsable catalog so other plugin authors can discover what to subscribe to.
from fastpluggy.core.events import EventSpec
from fastpluggy.core.module_base import FastPluggyBaseModule
class MyPlugin(FastPluggyBaseModule):
emits = [
EventSpec(
name="my_plugin.done",
description="Fired when processing completes.",
payload={"job_id": "int", "status": "str"},
),
]
EventSpec fields:
| Field | Type | Description |
|---|---|---|
name |
str |
Topic string, e.g. "my_plugin.done" |
description |
str |
When the event fires |
payload |
dict[str, str] |
Field name → type hint, e.g. {"user_id": "int"} |
Built-in lifecycle events
FastPluggy emits these automatically:
| Event | When | Payload |
|---|---|---|
fp.plugin.loaded |
After each plugin initialises | module_name, version |
fp.all_plugins_loaded |
After all plugins initialise | plugin_count |
fp.templates_ready |
After configure_templates() |
— |
fp.app_ready |
End of load_app() |
— |
def on_load_complete(self, fast_pluggy):
@fast_pluggy.events.on("fp.app_ready")
def ready(event: Event):
logger.info("All plugins loaded, app is ready")
Error handling
By default, listener exceptions are logged with a traceback but do not stop other listeners from running. Override the error handler globally:
Introspection
bus = fast_pluggy.events
bus.listener_count() # total listeners across all topics
bus.topics() # list of subscribed topic names
bus.topic_info() # [{topic, count, listeners, flags}]
bus.namespace_info() # [{prefix, count, listeners}]
The admin Events page at /admin/events visualises all of this at runtime.
Example: cross-plugin audit log
# audit_plugin/plugin.py
class AuditPlugin(FastPluggyBaseModule):
module_name = "audit_plugin"
def on_load_complete(self, fast_pluggy):
fast_pluggy.events.on_namespace("user", self._record)
async def _record(self, event: Event):
await db.insert(AuditLog(
topic=event.name, source=event.source,
data=event.payload, at=event.timestamp,
))
auth_user emits the events, audit_plugin listens — neither plugin knows the other exists.
Adding a third subscriber (e.g. a notification plugin) requires zero changes to either.