Skip to content

mnesis.store

store

Mnesis persistence layer.

DuplicateIDError

DuplicateIDError(record_id: str)

Bases: MnesisStoreError

Raised when attempting to insert a record with a duplicate primary key.

ImmutableFieldError

Bases: MnesisStoreError

Raised when attempting to modify an immutable field.

ImmutableStore

ImmutableStore(
    config: StoreConfig, pool: StorePool | None = None
)

Append-only, SQLite-backed message log.

All writes use transactions. Part content is immutable — only status metadata (tool_state, compacted_at, timing, output) can be updated via update_part_status().

When a StorePool is supplied the store borrows a shared connection from it — close() releases any local-only connection but leaves pooled connections open (the pool owns their lifetime). This lets many concurrent MnesisSession objects share one physical SQLite connection without database is locked errors, and maps cleanly to a PostgreSQL connection-pool in the future.

Usage (standalone)::

store = ImmutableStore(StoreConfig())
await store.initialize()
try:
    await store.create_session(session)
    await store.append_message(msg)
finally:
    await store.close()   # closes the private connection

Usage (with pool)::

pool = StorePool()
store_a = ImmutableStore(config, pool=pool)
store_b = ImmutableStore(config, pool=pool)
await store_a.initialize()   # opens the shared connection once
await store_b.initialize()   # reuses it
await store_a.close()        # no-op — pool owns the connection
await store_b.close()        # no-op
await pool.close_all()       # actually closes the connection

initialize async

initialize() -> None

Open (or borrow) a database connection and apply the schema.

When a pool is set, the connection is acquired from the pool and the schema is applied idempotently (CREATE TABLE IF NOT EXISTS). When no pool is set, a private connection is opened; the caller is responsible for calling close() to release it.

Raises:

Type Description
Error

If the database cannot be opened or the schema fails.

close async

close() -> None

Release the database connection.

If the connection is owned by a pool this is a no-op — the pool manages the connection lifetime. If the connection is private (no pool was supplied) it is closed and set to None.

create_session async

create_session(
    id: str,
    *,
    model_id: str = "",
    provider_id: str = "",
    agent: str = "default",
    parent_id: str | None = None,
    title: str | None = None,
    metadata: dict[str, Any] | None = None,
    system_prompt: str = "",
) -> Session

Insert a new session row.

Parameters:

Name Type Description Default
id str

ULID-based session ID (e.g. sess_01JXYZ...).

required
model_id str

The model string used by this session.

''
provider_id str

The provider (e.g. anthropic, openai).

''
agent str

Agent role name (default "default").

'default'
parent_id str | None

Optional parent session ID for sub-sessions.

None
title str | None

Optional human-readable title.

None
metadata dict[str, Any] | None

Optional JSON-serializable metadata dict.

None
system_prompt str

System prompt for this session. Persisted so that load() can restore the original prompt on resume.

''

Returns:

Type Description
Session

The created Session.

Raises:

Type Description
DuplicateIDError

If a session with this ID already exists.

get_session async

get_session(session_id: str) -> Session

Fetch a session by ID.

Raises:

Type Description
SessionNotFoundError

If no session with this ID exists.

list_sessions async

list_sessions(
    *,
    parent_id: str | None = None,
    active_only: bool = True,
    limit: int = 100,
    offset: int = 0,
) -> list[Session]

List sessions, newest first.

Parameters:

Name Type Description Default
parent_id str | None

Filter to sessions with this parent (sub-sessions).

None
active_only bool

Exclude soft-deleted sessions when True.

True
limit int

Maximum number of sessions to return.

100
offset int

Number of sessions to skip (for pagination).

0

Returns:

Type Description
list[Session]

List of Session objects ordered by created_at DESC.

soft_delete_session async

soft_delete_session(session_id: str) -> None

Mark a session as inactive (is_active=0). Messages and parts are retained.

append_message async

append_message(message: Message) -> Message

Append a message to the log.

For non-summary messages a context_items row is inserted atomically in the same transaction so that the context-assembly view stays consistent with the message log. Summary messages are NOT inserted here — the compaction engine inserts their context_items row separately as part of the atomic context swap.

Parameters:

Name Type Description Default
message Message

The Message to persist. Must have a pre-generated id.

required

Returns:

Type Description
Message

The stored message (unmodified).

Raises:

Type Description
SessionNotFoundError

If session_id does not exist.

DuplicateIDError

If a message with this ID already exists.

append_part async

append_part(part: RawMessagePart) -> RawMessagePart

Append a single part to a message.

Assigns part_index as max(existing_parts) + 1 within a transaction.

Parameters:

Name Type Description Default
part RawMessagePart

The RawMessagePart to persist.

required

Returns:

Type Description
RawMessagePart

The stored part with its assigned part_index.

Raises:

Type Description
MessageNotFoundError

If message_id does not exist.

update_part_status async

update_part_status(
    part_id: str,
    *,
    tool_state: str | None = None,
    compacted_at: int | None = None,
    started_at: int | None = None,
    completed_at: int | None = None,
    output: str | None = None,
    error_message: str | None = None,
) -> None

Update mutable status fields on a part.

This is the only permitted mutation of persisted data.

Parameters:

Name Type Description Default
part_id str

The part to update.

required
tool_state str | None

New tool lifecycle state.

None
compacted_at int | None

Unix ms timestamp — sets the pruning tombstone.

None
started_at int | None

Tool execution start timestamp.

None
completed_at int | None

Tool execution end timestamp.

None
output str | None

Tool result output string (merged into content JSON).

None
error_message str | None

Tool error message (merged into content JSON).

None

Raises:

Type Description
PartNotFoundError

If the part does not exist.

update_message_tokens async

update_message_tokens(
    message_id: str,
    tokens: TokenUsage,
    cost: float,
    finish_reason: str,
) -> None

Update token usage fields on an assistant message after streaming completes.

Parameters:

Name Type Description Default
message_id str

The message to update.

required
tokens TokenUsage

Final token usage from the provider response.

required
cost float

Dollar cost of the response.

required
finish_reason str

Provider finish reason (e.g. "stop", "max_tokens").

required

get_message async

get_message(message_id: str) -> Message

Fetch a single message by ID.

Raises:

Type Description
MessageNotFoundError

If no message with this ID exists.

get_messages async

get_messages(
    session_id: str, *, since_message_id: str | None = None
) -> list[Message]

Fetch all messages for a session in chronological order.

Parameters:

Name Type Description Default
session_id str

The session to query.

required
since_message_id str | None

If provided, returns only messages created after the message with this ID.

None

Returns:

Type Description
list[Message]

List of Message objects ordered by created_at ASC.

get_parts async

get_parts(message_id: str) -> list[RawMessagePart]

Fetch all parts for a message, ordered by part_index ASC.

get_messages_with_parts async

get_messages_with_parts(
    session_id: str, *, since_message_id: str | None = None
) -> list[MessageWithParts]

Efficiently fetch messages and their parts in two queries (no N+1).

Algorithm: 1. Fetch all matching messages. 2. Fetch all parts for those message IDs in a single IN query. 3. Group parts by message ID and join in Python. 4. Deserialize part JSON into typed MessagePart objects.

Parameters:

Name Type Description Default
session_id str

The session to query.

required
since_message_id str | None

If provided, returns only messages after this boundary.

None

Returns:

Type Description
list[MessageWithParts]

List of MessageWithParts in chronological order.

get_messages_with_parts_by_ids async

get_messages_with_parts_by_ids(
    message_ids: list[str],
) -> list[MessageWithParts]

Fetch a specific set of messages (and their parts) by ID.

Returns only the requested messages in the same order as message_ids, making context assembly O(k) in the number of context items rather than O(n) in the total session length.

Parameters:

Name Type Description Default
message_ids list[str]

Ordered list of message IDs to fetch.

required

Returns:

Type Description
list[MessageWithParts]

List of MessageWithParts in the order of message_ids.

get_last_summary_message async

get_last_summary_message(session_id: str) -> Message | None

Return the most recent message with is_summary=True, or None.

Uses the partial index on (session_id, is_summary) for O(log n) lookup.

store_file_reference async

store_file_reference(ref: FileReference) -> FileReference

Insert or replace a file reference (upsert by content_id).

Parameters:

Name Type Description Default
ref FileReference

The FileReference to persist.

required

Returns:

Type Description
FileReference

The stored reference.

get_file_reference async

get_file_reference(content_id: str) -> FileReference | None

Fetch a file reference by content hash. Returns None if not found.

get_file_reference_by_path async

get_file_reference_by_path(
    path: str,
) -> FileReference | None

Fetch the most recently stored file reference for a given path.

get_context_items async

get_context_items(session_id: str) -> list[tuple[str, str]]

Return the ordered context items for a session.

Each item is a (item_type, item_id) pair in ascending position order — the canonical sequence that ContextBuilder assembles into an LLM message list.

Parameters:

Name Type Description Default
session_id str

The session to query.

required

Returns:

Type Description
list[tuple[str, str]]

List of (item_type, item_id) tuples ordered by position ASC.

swap_context_items async

swap_context_items(
    session_id: str,
    remove_item_ids: list[str],
    summary_id: str,
) -> None

Atomically replace a set of context items with a single summary row.

This is the core of the O(1) compaction commit: within a single transaction, all rows whose item_id is in remove_item_ids are deleted and one new 'summary' row is inserted at the minimum position of the removed items (i.e. it occupies the slot of the first item being removed); items outside the compacted span keep their original positions — no shifting required.

Used in two scenarios:

  • Leaf summarisation: remove_item_ids contains the message IDs that were compacted (item_type='message').
  • Condensation: remove_item_ids contains the parent summary node IDs (item_type='summary') being merged.

Parameters:

Name Type Description Default
session_id str

The session being compacted.

required
remove_item_ids list[str]

Context item IDs to remove (any item_type).

required
summary_id str

ID of the SummaryNode/condensed node that replaces them.

required

MessageNotFoundError

MessageNotFoundError(message_id: str)

Bases: MnesisStoreError

Raised when a message_id does not exist in the store.

MnesisStoreError

Bases: Exception

Base class for store errors.

PartNotFoundError

PartNotFoundError(part_id: str)

Bases: MnesisStoreError

Raised when a part_id does not exist in the store.

RawMessagePart

RawMessagePart(
    id: str,
    message_id: str,
    session_id: str,
    part_type: str,
    content: str,
    part_index: int = 0,
    tool_name: str | None = None,
    tool_call_id: str | None = None,
    tool_state: str | None = None,
    compacted_at: int | None = None,
    started_at: int | None = None,
    completed_at: int | None = None,
    token_estimate: int = 0,
)

Internal storage model for a message part row.

Session

Session(
    id: str,
    parent_id: str | None,
    created_at: int,
    updated_at: int,
    model_id: str,
    provider_id: str,
    agent: str,
    title: str | None,
    is_active: bool,
    metadata: dict[str, Any] | None,
    system_prompt: str = "",
)

Thin data class for session rows (not Pydantic — avoids heavy validation on reads).

SessionNotFoundError

SessionNotFoundError(session_id: str)

Bases: MnesisStoreError

Raised when a session_id does not exist in the store.

StorePool

StorePool()

Process-scoped registry of open aiosqlite.Connection objects.

Thread-safety: only safe to use from a single asyncio event loop — do not share a StorePool across threads.

For each unique resolved database path the pool holds exactly one connection. Callers may call acquire() concurrently; only the first caller opens the connection, subsequent callers receive the same object.

The pool also manages a per-path asyncio.Lock that ImmutableStore uses to serialise write transactions. SQLite in WAL mode allows unlimited concurrent readers but only one writer; the lock prevents competing writes from getting database is locked errors on busy concurrent workloads.

acquire async

acquire(
    db_path: str,
    *,
    wal_mode: bool = True,
    connection_timeout: float = 30.0,
) -> aiosqlite.Connection

Return the shared connection for db_path, opening it if needed.

Parameters:

Name Type Description Default
db_path str

Resolved (~ expanded) absolute path to the database file.

required
wal_mode bool

Enable WAL journal mode on first open.

True
connection_timeout float

SQLite busy timeout in seconds.

30.0

Returns:

Type Description
Connection

The shared aiosqlite.Connection for this path.

write_lock

write_lock(db_path: str) -> asyncio.Lock

Return the write-serialisation lock for db_path.

The lock must already exist (i.e. acquire() must have been called for this path). Raises KeyError if called before acquire().

close_path async

close_path(db_path: str) -> None

Close and remove the connection for a single path.

close_all async

close_all() -> None

Close every connection managed by this pool.

default staticmethod

default() -> StorePool

Return the process-level default pool.

The default pool is created lazily on first access and lives for the lifetime of the process. It is suitable for production use; tests should create their own StorePool() instances to get full isolation.

SummaryDAGStore

SummaryDAGStore(store: ImmutableStore)

Manages the logical DAG of summary nodes.

Summary nodes are persisted to two places:

  1. The messages table (is_summary=True) — read by ContextBuilder to inject summaries into the active context window.
  2. The summary_nodes table — stores the full DAG: kind, parent_node_ids (JSON array), and superseded flag. This is the source of truth for DAG relationships and survives session restarts.

Superseded nodes are tracked in an in-memory set (_superseded_ids) for fast within-session filtering, and persisted to summary_nodes.superseded so the state is recoverable after a process restart.

mark_superseded async

mark_superseded(node_ids: list[str]) -> None

Mark one or more summary nodes as superseded by a condensed node.

Updates both the in-memory set (for fast within-session filtering) and the summary_nodes table (so state survives a restart).

Parameters:

Name Type Description Default
node_ids list[str]

IDs of the summary nodes consumed by a condensation.

required

get_active_nodes async

get_active_nodes(session_id: str) -> list[SummaryNode]

Return all active (non-superseded) summary nodes for a session.

Queries summary_nodes WHERE superseded=0 to recover DAG state after a restart, then populates each node's content from the corresponding message parts. Also applies the in-memory superseded set for nodes marked superseded within the current session but not yet flushed (should not happen in practice, but guards against concurrent writes).

Parameters:

Name Type Description Default
session_id str

The session to query.

required

Returns:

Type Description
list[SummaryNode]

List of SummaryNode objects in creation order (created_at ASC).

get_latest_node async

get_latest_node(session_id: str) -> SummaryNode | None

Return the most recently created active summary node, or None.

Reads from summary_nodes to respect DAG supersession correctly, including across restarts.

Parameters:

Name Type Description Default
session_id str

The session to query.

required

Returns:

Type Description
SummaryNode | None

The latest active SummaryNode, or None.

get_coverage_gaps async

get_coverage_gaps(session_id: str) -> list[MessageSpan]

Return spans of messages not covered by any summary node.

In Phase 1, there is at most one summary node (the most recent), so the only gap is the tail of messages after that node.

Parameters:

Name Type Description Default
session_id str

The session to query.

required

Returns:

Type Description
list[MessageSpan]

List of MessageSpan objects representing uncovered message ranges.

insert_node async

insert_node(
    node: SummaryNode, *, id_generator: Callable[[], str]
) -> SummaryNode

Persist a new summary node.

Writes: - A Message with is_summary=True (read by ContextBuilder). - A TextPart with the summary content. - A CompactionMarkerPart with metadata. - A row in summary_nodes with kind, parent_node_ids (JSON), and superseded=0 (the DAG persistence added in Phase 3).

Parameters:

Name Type Description Default
node SummaryNode

The SummaryNode to persist. node.id must be a valid message ID generated by make_id("msg").

required
id_generator Callable[[], str]

Callable that returns new unique part IDs.

required

Returns:

Type Description
SummaryNode

The persisted node (unmodified).

get_node_by_id async

get_node_by_id(node_id: str) -> SummaryNode | None

Fetch a specific summary node by ID.

Parameters:

Name Type Description Default
node_id str

The message ID of the summary node.

required

Returns:

Type Description
SummaryNode | None

The SummaryNode, or None if not found or not a summary.