mnesis.events.bus¶
bus
¶
In-process pub/sub event bus for Mnesis session lifecycle events.
MnesisEvent
¶
Bases: StrEnum
All event types published by Mnesis components.
Typed payload definitions for each event live in
:mod:mnesis.events.payloads. Import them for static type checking::
from mnesis.events.payloads import CompactionCompletedPayload
Payload schemas by event:
SESSION_CREATED
:class:~mnesis.events.payloads.SessionCreatedPayload —
session_id: str, model: str
SESSION_CLOSED
:class:~mnesis.events.payloads.SessionClosedPayload —
session_id: str
SESSION_UPDATED, SESSION_DELETED
Reserved — not yet published.
MESSAGE_CREATED
:class:~mnesis.events.payloads.MessageCreatedPayload —
message_id: str, role: str ("user" or "assistant")
MESSAGE_UPDATED, PART_CREATED, PART_UPDATED
Reserved — not yet published.
COMPACTION_TRIGGERED
:class:~mnesis.events.payloads.CompactionTriggeredPayload —
session_id: str, tokens: int
COMPACTION_COMPLETED
:class:~mnesis.events.payloads.CompactionCompletedPayload —
all fields from :class:~mnesis.models.message.CompactionResult
serialized via model_dump().
COMPACTION_FAILED
:class:~mnesis.events.payloads.CompactionFailedPayload —
session_id: str, error: str
PRUNE_COMPLETED
Reserved — not yet published.
DOOM_LOOP_DETECTED
:class:~mnesis.events.payloads.DoomLoopDetectedPayload —
session_id: str, tool: str
MAP_STARTED, MAP_ITEM_COMPLETED, MAP_COMPLETED
:class:~mnesis.events.payloads.MapStartedPayload,
:class:~mnesis.events.payloads.MapItemCompletedPayload,
:class:~mnesis.events.payloads.MapCompletedPayload.
**Important:** These events fire on the *operator's own EventBus*,
not the session bus. Subscribe via the session bus only if you
injected it into the operator at construction time::
llm_map = LLMMap(config.operators, event_bus=session.event_bus)
EventBus
¶
Simple in-process pub/sub event bus.
Design decisions:
- Sync handlers are called inline within publish().
- Async handlers are scheduled via asyncio.create_task() (fire-and-forget).
- Handler exceptions are logged but never propagate to the publisher.
- Each MnesisSession owns its own EventBus instance for isolation.
Share one instance across sessions for cross-session monitoring.
Example::
bus = EventBus()
def on_compaction(event, payload):
print(f"Compacted {payload['compacted_message_count']} messages")
bus.subscribe(MnesisEvent.COMPACTION_COMPLETED, on_compaction)
bus.publish(MnesisEvent.COMPACTION_COMPLETED, {"compacted_message_count": 10})
subscribe
¶
Register a handler for a specific event type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
MnesisEvent
|
The event type to listen for. |
required |
handler
|
Handler
|
Callable accepting |
required |
subscribe_all
¶
Register a handler for ALL event types.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handler
|
Handler
|
Callable accepting |
required |
unsubscribe
¶
Remove a previously registered handler.
Silent no-op if handler is not registered for event — this
matches the behaviour of most event systems and makes teardown code safe
to call unconditionally without tracking registration state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
MnesisEvent
|
The event type the handler was registered for. |
required |
handler
|
Handler
|
The handler callable to remove. |
required |
unsubscribe_all
¶
Remove handler from all event types and from the global handler list.
Mirrors :meth:subscribe_all for symmetric cleanup. Silent no-op
for any event type or global slot where handler is not registered.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handler
|
Handler
|
The handler callable to remove everywhere. |
required |
publish
¶
Publish an event to all registered handlers.
Sync handlers are called immediately in registration order. Async handlers are scheduled as background tasks (non-blocking). Exceptions from any handler are logged and swallowed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
MnesisEvent
|
The event type to publish. |
required |
payload
|
dict[str, Any]
|
Event-specific data dictionary. |
required |