Skip to content

mnesis.store.immutable

immutable

Append-only SQLite-backed message store.

MnesisStoreError

Bases: Exception

Base class for store errors.

SessionNotFoundError

SessionNotFoundError(session_id: str)

Bases: MnesisStoreError

Raised when a session_id does not exist in the store.

MessageNotFoundError

MessageNotFoundError(message_id: str)

Bases: MnesisStoreError

Raised when a message_id does not exist in the store.

PartNotFoundError

PartNotFoundError(part_id: str)

Bases: MnesisStoreError

Raised when a part_id does not exist in the store.

DuplicateIDError

DuplicateIDError(record_id: str)

Bases: MnesisStoreError

Raised when attempting to insert a record with a duplicate primary key.

ImmutableFieldError

Bases: MnesisStoreError

Raised when attempting to modify an immutable field.

Session

Session(
    id: str,
    parent_id: str | None,
    created_at: int,
    updated_at: int,
    model_id: str,
    provider_id: str,
    agent: str,
    title: str | None,
    is_active: bool,
    metadata: dict[str, Any] | None,
    system_prompt: str = "",
)

Thin data class for session rows (not Pydantic — avoids heavy validation on reads).

RawMessagePart

RawMessagePart(
    id: str,
    message_id: str,
    session_id: str,
    part_type: str,
    content: str,
    part_index: int = 0,
    tool_name: str | None = None,
    tool_call_id: str | None = None,
    tool_state: str | None = None,
    compacted_at: int | None = None,
    started_at: int | None = None,
    completed_at: int | None = None,
    token_estimate: int = 0,
)

Internal storage model for a message part row.

ImmutableStore

ImmutableStore(
    config: StoreConfig, pool: StorePool | None = None
)

Append-only, SQLite-backed message log.

All writes use transactions. Part content is immutable — only status metadata (tool_state, compacted_at, timing, output) can be updated via update_part_status().

When a StorePool is supplied the store borrows a shared connection from it — close() releases any local-only connection but leaves pooled connections open (the pool owns their lifetime). This lets many concurrent MnesisSession objects share one physical SQLite connection without database is locked errors, and maps cleanly to a PostgreSQL connection-pool in the future.

Usage (standalone)::

store = ImmutableStore(StoreConfig())
await store.initialize()
try:
    await store.create_session(session)
    await store.append_message(msg)
finally:
    await store.close()   # closes the private connection

Usage (with pool)::

pool = StorePool()
store_a = ImmutableStore(config, pool=pool)
store_b = ImmutableStore(config, pool=pool)
await store_a.initialize()   # opens the shared connection once
await store_b.initialize()   # reuses it
await store_a.close()        # no-op — pool owns the connection
await store_b.close()        # no-op
await pool.close_all()       # actually closes the connection

initialize async

initialize() -> None

Open (or borrow) a database connection and apply the schema.

When a pool is set, the connection is acquired from the pool and the schema is applied idempotently (CREATE TABLE IF NOT EXISTS). When no pool is set, a private connection is opened; the caller is responsible for calling close() to release it.

Raises:

Type Description
Error

If the database cannot be opened or the schema fails.

close async

close() -> None

Release the database connection.

If the connection is owned by a pool this is a no-op — the pool manages the connection lifetime. If the connection is private (no pool was supplied) it is closed and set to None.

create_session async

create_session(
    id: str,
    *,
    model_id: str = "",
    provider_id: str = "",
    agent: str = "default",
    parent_id: str | None = None,
    title: str | None = None,
    metadata: dict[str, Any] | None = None,
    system_prompt: str = "",
) -> Session

Insert a new session row.

Parameters:

Name Type Description Default
id str

ULID-based session ID (e.g. sess_01JXYZ...).

required
model_id str

The model string used by this session.

''
provider_id str

The provider (e.g. anthropic, openai).

''
agent str

Agent role name (default "default").

'default'
parent_id str | None

Optional parent session ID for sub-sessions.

None
title str | None

Optional human-readable title.

None
metadata dict[str, Any] | None

Optional JSON-serializable metadata dict.

None
system_prompt str

System prompt for this session. Persisted so that load() can restore the original prompt on resume.

''

Returns:

Type Description
Session

The created Session.

Raises:

Type Description
DuplicateIDError

If a session with this ID already exists.

get_session async

get_session(session_id: str) -> Session

Fetch a session by ID.

Raises:

Type Description
SessionNotFoundError

If no session with this ID exists.

list_sessions async

list_sessions(
    *,
    parent_id: str | None = None,
    active_only: bool = True,
    limit: int = 100,
    offset: int = 0,
) -> list[Session]

List sessions, newest first.

Parameters:

Name Type Description Default
parent_id str | None

Filter to sessions with this parent (sub-sessions).

None
active_only bool

Exclude soft-deleted sessions when True.

True
limit int

Maximum number of sessions to return.

100
offset int

Number of sessions to skip (for pagination).

0

Returns:

Type Description
list[Session]

List of Session objects ordered by created_at DESC.

soft_delete_session async

soft_delete_session(session_id: str) -> None

Mark a session as inactive (is_active=0). Messages and parts are retained.

append_message async

append_message(message: Message) -> Message

Append a message to the log.

For non-summary messages a context_items row is inserted atomically in the same transaction so that the context-assembly view stays consistent with the message log. Summary messages are NOT inserted here — the compaction engine inserts their context_items row separately as part of the atomic context swap.

Parameters:

Name Type Description Default
message Message

The Message to persist. Must have a pre-generated id.

required

Returns:

Type Description
Message

The stored message (unmodified).

Raises:

Type Description
SessionNotFoundError

If session_id does not exist.

DuplicateIDError

If a message with this ID already exists.

append_part async

append_part(part: RawMessagePart) -> RawMessagePart

Append a single part to a message.

Assigns part_index as max(existing_parts) + 1 within a transaction.

Parameters:

Name Type Description Default
part RawMessagePart

The RawMessagePart to persist.

required

Returns:

Type Description
RawMessagePart

The stored part with its assigned part_index.

Raises:

Type Description
MessageNotFoundError

If message_id does not exist.

update_part_status async

update_part_status(
    part_id: str,
    *,
    tool_state: str | None = None,
    compacted_at: int | None = None,
    started_at: int | None = None,
    completed_at: int | None = None,
    output: str | None = None,
    error_message: str | None = None,
) -> None

Update mutable status fields on a part.

This is the only permitted mutation of persisted data.

Parameters:

Name Type Description Default
part_id str

The part to update.

required
tool_state str | None

New tool lifecycle state.

None
compacted_at int | None

Unix ms timestamp — sets the pruning tombstone.

None
started_at int | None

Tool execution start timestamp.

None
completed_at int | None

Tool execution end timestamp.

None
output str | None

Tool result output string (merged into content JSON).

None
error_message str | None

Tool error message (merged into content JSON).

None

Raises:

Type Description
PartNotFoundError

If the part does not exist.

update_message_tokens async

update_message_tokens(
    message_id: str,
    tokens: TokenUsage,
    cost: float,
    finish_reason: str,
) -> None

Update token usage fields on an assistant message after streaming completes.

Parameters:

Name Type Description Default
message_id str

The message to update.

required
tokens TokenUsage

Final token usage from the provider response.

required
cost float

Dollar cost of the response.

required
finish_reason str

Provider finish reason (e.g. "stop", "max_tokens").

required

get_message async

get_message(message_id: str) -> Message

Fetch a single message by ID.

Raises:

Type Description
MessageNotFoundError

If no message with this ID exists.

get_messages async

get_messages(
    session_id: str, *, since_message_id: str | None = None
) -> list[Message]

Fetch all messages for a session in chronological order.

Parameters:

Name Type Description Default
session_id str

The session to query.

required
since_message_id str | None

If provided, returns only messages created after the message with this ID.

None

Returns:

Type Description
list[Message]

List of Message objects ordered by created_at ASC.

get_parts async

get_parts(message_id: str) -> list[RawMessagePart]

Fetch all parts for a message, ordered by part_index ASC.

get_messages_with_parts async

get_messages_with_parts(
    session_id: str, *, since_message_id: str | None = None
) -> list[MessageWithParts]

Efficiently fetch messages and their parts in two queries (no N+1).

Algorithm: 1. Fetch all matching messages. 2. Fetch all parts for those message IDs in a single IN query. 3. Group parts by message ID and join in Python. 4. Deserialize part JSON into typed MessagePart objects.

Parameters:

Name Type Description Default
session_id str

The session to query.

required
since_message_id str | None

If provided, returns only messages after this boundary.

None

Returns:

Type Description
list[MessageWithParts]

List of MessageWithParts in chronological order.

get_messages_with_parts_by_ids async

get_messages_with_parts_by_ids(
    message_ids: list[str],
) -> list[MessageWithParts]

Fetch a specific set of messages (and their parts) by ID.

Returns only the requested messages in the same order as message_ids, making context assembly O(k) in the number of context items rather than O(n) in the total session length.

Parameters:

Name Type Description Default
message_ids list[str]

Ordered list of message IDs to fetch.

required

Returns:

Type Description
list[MessageWithParts]

List of MessageWithParts in the order of message_ids.

get_last_summary_message async

get_last_summary_message(session_id: str) -> Message | None

Return the most recent message with is_summary=True, or None.

Uses the partial index on (session_id, is_summary) for O(log n) lookup.

store_file_reference async

store_file_reference(ref: FileReference) -> FileReference

Insert or replace a file reference (upsert by content_id).

Parameters:

Name Type Description Default
ref FileReference

The FileReference to persist.

required

Returns:

Type Description
FileReference

The stored reference.

get_file_reference async

get_file_reference(content_id: str) -> FileReference | None

Fetch a file reference by content hash. Returns None if not found.

get_file_reference_by_path async

get_file_reference_by_path(
    path: str,
) -> FileReference | None

Fetch the most recently stored file reference for a given path.

get_context_items async

get_context_items(session_id: str) -> list[tuple[str, str]]

Return the ordered context items for a session.

Each item is a (item_type, item_id) pair in ascending position order — the canonical sequence that ContextBuilder assembles into an LLM message list.

Parameters:

Name Type Description Default
session_id str

The session to query.

required

Returns:

Type Description
list[tuple[str, str]]

List of (item_type, item_id) tuples ordered by position ASC.

swap_context_items async

swap_context_items(
    session_id: str,
    remove_item_ids: list[str],
    summary_id: str,
) -> None

Atomically replace a set of context items with a single summary row.

This is the core of the O(1) compaction commit: within a single transaction, all rows whose item_id is in remove_item_ids are deleted and one new 'summary' row is inserted at the minimum position of the removed items (i.e. it occupies the slot of the first item being removed); items outside the compacted span keep their original positions — no shifting required.

Used in two scenarios:

  • Leaf summarisation: remove_item_ids contains the message IDs that were compacted (item_type='message').
  • Condensation: remove_item_ids contains the parent summary node IDs (item_type='summary') being merged.

Parameters:

Name Type Description Default
session_id str

The session being compacted.

required
remove_item_ids list[str]

Context item IDs to remove (any item_type).

required
summary_id str

ID of the SummaryNode/condensed node that replaces them.

required