Skip to content

Event Bus

FastPluggyEventBus is a lightweight, thread-safe, string-keyed publish/subscribe bus built into FastPluggy. Plugins communicate through it without knowing each other exist.

Access it via fast_pluggy.events (in hooks) or request.app.state.events (in route handlers).


Subscribing to events

In on_load_complete

from fastpluggy.core.events import Event
from fastpluggy.core.module_base import FastPluggyBaseModule

class MyPlugin(FastPluggyBaseModule):
    module_name = "my_plugin"

    def on_load_complete(self, fast_pluggy):
        fast_pluggy.events.on("user.login", self._on_login)

    def _on_login(self, event: Event):
        print(f"{event.payload['username']} logged in from {event.payload['ip']}")

The decorator form is also valid:

def on_load_complete(self, fast_pluggy):
    @fast_pluggy.events.on("user.login")
    def handle(event: Event):
        ...

Options

Parameter Type Default Description
priority int 0 Higher runs first
once bool False Auto-unsubscribes after first fire
predicate Callable[[Event], bool] None Only fires when predicate returns True
fast_pluggy.events.on("user.login", handler, priority=10, once=True)
fast_pluggy.events.on("user.login", handler, predicate=lambda e: e.payload.get("ip") != "127.0.0.1")

Emitting events

# From a route handler
request.app.state.events.emit("user.login", source="auth_user",
                               user_id=user.id, username=user.username,
                               ip=request.client.host, remember=True)

# From a plugin method
fast_pluggy.events.emit("report.generated", source=self.module_name, report_id=report.id)

All keyword arguments after source become event.payload.


Namespace listeners

Subscribe to all events sharing a prefix with on_namespace:

# Fires for any event whose name starts with "user."
fast_pluggy.events.on_namespace("user", self._audit)

def _audit(self, event: Event):
    print(f"[audit] {event.name}{event.payload}")

Unsubscribe with off_namespace(prefix, fn).


Unsubscribing

fast_pluggy.events.off("user.login", handler)
fast_pluggy.events.off_namespace("user", handler)

Async listeners

Listeners can be async def. The bus dispatches them correctly in both contexts:

  • Inside uvicorn (FastAPI request cycle): scheduled via loop.create_task() — non-blocking
  • Worker thread (e.g. background jobs): executed via asyncio.run() — blocks the thread until done
async def _on_login(self, event: Event):
    await notify_service.send(event.payload["username"])

Temporary subscriptions (tests)

Use the subscribed context manager to register and auto-clean a listener:

with fast_pluggy.events.subscribed("user.login", handler):
    fast_pluggy.events.emit("user.login", source="test")
# listener removed here

Declaring emitted events (EventSpec)

Plugins document the events they emit via the emits field. These appear in the admin Events page as a browsable catalog so other plugin authors can discover what to subscribe to.

from fastpluggy.core.events import EventSpec
from fastpluggy.core.module_base import FastPluggyBaseModule

class MyPlugin(FastPluggyBaseModule):
    emits = [
        EventSpec(
            name="my_plugin.done",
            description="Fired when processing completes.",
            payload={"job_id": "int", "status": "str"},
        ),
    ]

EventSpec fields:

Field Type Description
name str Topic string, e.g. "my_plugin.done"
description str When the event fires
payload dict[str, str] Field name → type hint, e.g. {"user_id": "int"}

Built-in lifecycle events

FastPluggy emits these automatically:

Event When Payload
fp.plugin.loaded After each plugin initialises module_name, version
fp.all_plugins_loaded After all plugins initialise plugin_count
fp.templates_ready After configure_templates()
fp.app_ready End of load_app()
def on_load_complete(self, fast_pluggy):
    @fast_pluggy.events.on("fp.app_ready")
    def ready(event: Event):
        logger.info("All plugins loaded, app is ready")

Error handling

By default, listener exceptions are logged with a traceback but do not stop other listeners from running. Override the error handler globally:

fast_pluggy.events.on_error(lambda event, fn, exc: my_logger.error(...))

Introspection

bus = fast_pluggy.events

bus.listener_count()   # total listeners across all topics
bus.topics()           # list of subscribed topic names
bus.topic_info()       # [{topic, count, listeners, flags}]
bus.namespace_info()   # [{prefix, count, listeners}]

The admin Events page at /admin/events visualises all of this at runtime.


Example: cross-plugin audit log

# audit_plugin/plugin.py
class AuditPlugin(FastPluggyBaseModule):
    module_name = "audit_plugin"

    def on_load_complete(self, fast_pluggy):
        fast_pluggy.events.on_namespace("user", self._record)

    async def _record(self, event: Event):
        await db.insert(AuditLog(
            topic=event.name, source=event.source,
            data=event.payload, at=event.timestamp,
        ))

auth_user emits the events, audit_plugin listens — neither plugin knows the other exists. Adding a third subscriber (e.g. a notification plugin) requires zero changes to either.