Skip to content

mnesis.events

events

Mnesis event bus.

EventBus

EventBus(logger: BoundLogger | None = None)

Simple in-process pub/sub event bus.

Design decisions: - Sync handlers are called inline within publish(). - Async handlers are scheduled via asyncio.create_task() (fire-and-forget). - Handler exceptions are logged but never propagate to the publisher. - Each MnesisSession owns its own EventBus instance for isolation. Share one instance across sessions for cross-session monitoring.

Example::

bus = EventBus()

def on_compaction(event, payload):
    print(f"Compacted {payload['compacted_message_count']} messages")

bus.subscribe(MnesisEvent.COMPACTION_COMPLETED, on_compaction)
bus.publish(MnesisEvent.COMPACTION_COMPLETED, {"compacted_message_count": 10})

subscribe

subscribe(event: MnesisEvent, handler: Handler) -> None

Register a handler for a specific event type.

Parameters:

Name Type Description Default
event MnesisEvent

The event type to listen for.

required
handler Handler

Callable accepting (event, payload). May be sync or async.

required

subscribe_all

subscribe_all(handler: Handler) -> None

Register a handler for ALL event types.

Parameters:

Name Type Description Default
handler Handler

Callable accepting (event, payload). May be sync or async.

required

unsubscribe

unsubscribe(event: MnesisEvent, handler: Handler) -> None

Remove a previously registered handler.

Silent no-op if handler is not registered for event — this matches the behaviour of most event systems and makes teardown code safe to call unconditionally without tracking registration state.

Parameters:

Name Type Description Default
event MnesisEvent

The event type the handler was registered for.

required
handler Handler

The handler callable to remove.

required

unsubscribe_all

unsubscribe_all(handler: Handler) -> None

Remove handler from all event types and from the global handler list.

Mirrors :meth:subscribe_all for symmetric cleanup. Silent no-op for any event type or global slot where handler is not registered.

Parameters:

Name Type Description Default
handler Handler

The handler callable to remove everywhere.

required

publish

publish(
    event: MnesisEvent, payload: dict[str, Any]
) -> None

Publish an event to all registered handlers.

Sync handlers are called immediately in registration order. Async handlers are scheduled as background tasks (non-blocking). Exceptions from any handler are logged and swallowed.

Parameters:

Name Type Description Default
event MnesisEvent

The event type to publish.

required
payload dict[str, Any]

Event-specific data dictionary.

required

MnesisEvent

Bases: StrEnum

All event types published by Mnesis components.

Typed payload definitions for each event live in :mod:mnesis.events.payloads. Import them for static type checking::

from mnesis.events.payloads import CompactionCompletedPayload

Payload schemas by event:

SESSION_CREATED :class:~mnesis.events.payloads.SessionCreatedPayloadsession_id: str, model: str

SESSION_CLOSED :class:~mnesis.events.payloads.SessionClosedPayloadsession_id: str

SESSION_UPDATED, SESSION_DELETED Reserved — not yet published.

MESSAGE_CREATED :class:~mnesis.events.payloads.MessageCreatedPayloadmessage_id: str, role: str ("user" or "assistant")

MESSAGE_UPDATED, PART_CREATED, PART_UPDATED Reserved — not yet published.

COMPACTION_TRIGGERED :class:~mnesis.events.payloads.CompactionTriggeredPayloadsession_id: str, tokens: int

COMPACTION_COMPLETED :class:~mnesis.events.payloads.CompactionCompletedPayload — all fields from :class:~mnesis.models.message.CompactionResult serialized via model_dump().

COMPACTION_FAILED :class:~mnesis.events.payloads.CompactionFailedPayloadsession_id: str, error: str

PRUNE_COMPLETED Reserved — not yet published.

DOOM_LOOP_DETECTED :class:~mnesis.events.payloads.DoomLoopDetectedPayloadsession_id: str, tool: str

MAP_STARTED, MAP_ITEM_COMPLETED, MAP_COMPLETED :class:~mnesis.events.payloads.MapStartedPayload, :class:~mnesis.events.payloads.MapItemCompletedPayload, :class:~mnesis.events.payloads.MapCompletedPayload.

**Important:** These events fire on the *operator's own EventBus*,
not the session bus. Subscribe via the session bus only if you
injected it into the operator at construction time::

    llm_map = LLMMap(config.operators, event_bus=session.event_bus)

CompactionCompletedPayload

Bases: TypedDict

Payload for :attr:MnesisEvent.COMPACTION_COMPLETED.

This is the model_dump() of a :class:mnesis.models.message.CompactionResult. All fields from CompactionResult are present.

summary_message_id instance-attribute

summary_message_id: str

ID of the newly created summary message, or "" on failure.

level_used instance-attribute

level_used: int

Escalation level: 1 = selective LLM, 2 = aggressive LLM, 3 = deterministic.

pruned_tool_outputs instance-attribute

pruned_tool_outputs: int

Number of tool outputs tombstoned by the pruner during this run.

pruned_tokens instance-attribute

pruned_tokens: int

Tokens reclaimed by pruning tool outputs.

CompactionFailedPayload

Bases: TypedDict

Payload for :attr:MnesisEvent.COMPACTION_FAILED.

error instance-attribute

error: str

Human-readable error description.

CompactionTriggeredPayload

Bases: TypedDict

Payload for :attr:MnesisEvent.COMPACTION_TRIGGERED.

session_id instance-attribute

session_id: str

The session whose compaction was triggered.

tokens instance-attribute

tokens: int

Current cumulative token count that crossed the threshold.

DoomLoopDetectedPayload

Bases: TypedDict

Payload for :attr:MnesisEvent.DOOM_LOOP_DETECTED.

tool instance-attribute

tool: str

Name of the tool call that was repeated past the doom-loop threshold.

MapCompletedPayload

Bases: TypedDict

Payload for :attr:MnesisEvent.MAP_COMPLETED (operator bus only).

completed is present only for :class:~mnesis.operators.LLMMap. :class:~mnesis.operators.AgenticMap publishes only total.

total instance-attribute

total: int

Total number of items processed.

completed instance-attribute

completed: NotRequired[int]

Number of items that completed. Present for LLMMap; absent for AgenticMap.

MapItemCompletedPayload

Bases: TypedDict

Payload for :attr:MnesisEvent.MAP_ITEM_COMPLETED (operator bus only).

completed instance-attribute

completed: int

Number of items completed so far (including this one).

total instance-attribute

total: int

Total number of items.

success instance-attribute

success: bool

Whether this item succeeded.

MapStartedPayload

Bases: TypedDict

Payload for :attr:MnesisEvent.MAP_STARTED (operator bus only).

model is present only for :class:~mnesis.operators.LLMMap. type is present only for :class:~mnesis.operators.AgenticMap (value: "agentic").

total instance-attribute

total: int

Total number of items to process.

model instance-attribute

model: NotRequired[str]

LLM model string. Present for LLMMap; absent for AgenticMap.

type instance-attribute

type: NotRequired[str]

Operator type string. Present for AgenticMap ("agentic"); absent for LLMMap.

MessageCreatedPayload

Bases: TypedDict

Payload for :attr:MnesisEvent.MESSAGE_CREATED.

message_id instance-attribute

message_id: str

The newly persisted message's ID.

role instance-attribute

role: str

"user" or "assistant".

SessionClosedPayload

Bases: TypedDict

Payload for :attr:MnesisEvent.SESSION_CLOSED.

session_id instance-attribute

session_id: str

The session that was closed.

SessionCreatedPayload

Bases: TypedDict

Payload for :attr:MnesisEvent.SESSION_CREATED.

session_id instance-attribute

session_id: str

The newly created session's ID.

model instance-attribute

model: str

The LLM model string the session was created with.