mnesis.store¶
store
¶
Mnesis persistence layer.
DuplicateIDError
¶
ImmutableFieldError
¶
Bases: MnesisStoreError
Raised when attempting to modify an immutable field.
ImmutableStore
¶
Append-only, SQLite-backed message log.
All writes use transactions. Part content is immutable — only status
metadata (tool_state, compacted_at, timing, output) can be updated
via update_part_status().
When a StorePool is supplied the store borrows a shared connection
from it — close() releases any local-only connection but leaves
pooled connections open (the pool owns their lifetime). This lets many
concurrent MnesisSession objects share one physical SQLite connection
without database is locked errors, and maps cleanly to a PostgreSQL
connection-pool in the future.
Usage (standalone)::
store = ImmutableStore(StoreConfig())
await store.initialize()
try:
await store.create_session(session)
await store.append_message(msg)
finally:
await store.close() # closes the private connection
Usage (with pool)::
pool = StorePool()
store_a = ImmutableStore(config, pool=pool)
store_b = ImmutableStore(config, pool=pool)
await store_a.initialize() # opens the shared connection once
await store_b.initialize() # reuses it
await store_a.close() # no-op — pool owns the connection
await store_b.close() # no-op
await pool.close_all() # actually closes the connection
initialize
async
¶
Open (or borrow) a database connection and apply the schema.
When a pool is set, the connection is acquired from the pool and the
schema is applied idempotently (CREATE TABLE IF NOT EXISTS).
When no pool is set, a private connection is opened; the caller is
responsible for calling close() to release it.
Raises:
| Type | Description |
|---|---|
Error
|
If the database cannot be opened or the schema fails. |
close
async
¶
Release the database connection.
If the connection is owned by a pool this is a no-op — the pool
manages the connection lifetime. If the connection is private
(no pool was supplied) it is closed and set to None.
create_session
async
¶
create_session(
id: str,
*,
model_id: str = "",
provider_id: str = "",
agent: str = "default",
parent_id: str | None = None,
title: str | None = None,
metadata: dict[str, Any] | None = None,
system_prompt: str = "",
) -> Session
Insert a new session row.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
ULID-based session ID (e.g. |
required |
model_id
|
str
|
The model string used by this session. |
''
|
provider_id
|
str
|
The provider (e.g. |
''
|
agent
|
str
|
Agent role name (default |
'default'
|
parent_id
|
str | None
|
Optional parent session ID for sub-sessions. |
None
|
title
|
str | None
|
Optional human-readable title. |
None
|
metadata
|
dict[str, Any] | None
|
Optional JSON-serializable metadata dict. |
None
|
system_prompt
|
str
|
System prompt for this session. Persisted so that
|
''
|
Returns:
| Type | Description |
|---|---|
Session
|
The created Session. |
Raises:
| Type | Description |
|---|---|
DuplicateIDError
|
If a session with this ID already exists. |
get_session
async
¶
Fetch a session by ID.
Raises:
| Type | Description |
|---|---|
SessionNotFoundError
|
If no session with this ID exists. |
list_sessions
async
¶
list_sessions(
*,
parent_id: str | None = None,
active_only: bool = True,
limit: int = 100,
offset: int = 0,
) -> list[Session]
List sessions, newest first.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parent_id
|
str | None
|
Filter to sessions with this parent (sub-sessions). |
None
|
active_only
|
bool
|
Exclude soft-deleted sessions when True. |
True
|
limit
|
int
|
Maximum number of sessions to return. |
100
|
offset
|
int
|
Number of sessions to skip (for pagination). |
0
|
Returns:
| Type | Description |
|---|---|
list[Session]
|
List of Session objects ordered by created_at DESC. |
soft_delete_session
async
¶
Mark a session as inactive (is_active=0). Messages and parts are retained.
append_message
async
¶
Append a message to the log.
For non-summary messages a context_items row is inserted atomically
in the same transaction so that the context-assembly view stays
consistent with the message log. Summary messages are NOT inserted
here — the compaction engine inserts their context_items row
separately as part of the atomic context swap.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Message
|
The Message to persist. Must have a pre-generated id. |
required |
Returns:
| Type | Description |
|---|---|
Message
|
The stored message (unmodified). |
Raises:
| Type | Description |
|---|---|
SessionNotFoundError
|
If session_id does not exist. |
DuplicateIDError
|
If a message with this ID already exists. |
append_part
async
¶
Append a single part to a message.
Assigns part_index as max(existing_parts) + 1 within a transaction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
part
|
RawMessagePart
|
The RawMessagePart to persist. |
required |
Returns:
| Type | Description |
|---|---|
RawMessagePart
|
The stored part with its assigned part_index. |
Raises:
| Type | Description |
|---|---|
MessageNotFoundError
|
If message_id does not exist. |
update_part_status
async
¶
update_part_status(
part_id: str,
*,
tool_state: str | None = None,
compacted_at: int | None = None,
started_at: int | None = None,
completed_at: int | None = None,
output: str | None = None,
error_message: str | None = None,
) -> None
Update mutable status fields on a part.
This is the only permitted mutation of persisted data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
part_id
|
str
|
The part to update. |
required |
tool_state
|
str | None
|
New tool lifecycle state. |
None
|
compacted_at
|
int | None
|
Unix ms timestamp — sets the pruning tombstone. |
None
|
started_at
|
int | None
|
Tool execution start timestamp. |
None
|
completed_at
|
int | None
|
Tool execution end timestamp. |
None
|
output
|
str | None
|
Tool result output string (merged into content JSON). |
None
|
error_message
|
str | None
|
Tool error message (merged into content JSON). |
None
|
Raises:
| Type | Description |
|---|---|
PartNotFoundError
|
If the part does not exist. |
update_message_tokens
async
¶
update_message_tokens(
message_id: str,
tokens: TokenUsage,
cost: float,
finish_reason: str,
) -> None
Update token usage fields on an assistant message after streaming completes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_id
|
str
|
The message to update. |
required |
tokens
|
TokenUsage
|
Final token usage from the provider response. |
required |
cost
|
float
|
Dollar cost of the response. |
required |
finish_reason
|
str
|
Provider finish reason (e.g. |
required |
get_message
async
¶
Fetch a single message by ID.
Raises:
| Type | Description |
|---|---|
MessageNotFoundError
|
If no message with this ID exists. |
get_messages
async
¶
Fetch all messages for a session in chronological order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
str
|
The session to query. |
required |
since_message_id
|
str | None
|
If provided, returns only messages created after the message with this ID. |
None
|
Returns:
| Type | Description |
|---|---|
list[Message]
|
List of Message objects ordered by created_at ASC. |
get_parts
async
¶
Fetch all parts for a message, ordered by part_index ASC.
get_messages_with_parts
async
¶
get_messages_with_parts(
session_id: str, *, since_message_id: str | None = None
) -> list[MessageWithParts]
Efficiently fetch messages and their parts in two queries (no N+1).
Algorithm: 1. Fetch all matching messages. 2. Fetch all parts for those message IDs in a single IN query. 3. Group parts by message ID and join in Python. 4. Deserialize part JSON into typed MessagePart objects.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
str
|
The session to query. |
required |
since_message_id
|
str | None
|
If provided, returns only messages after this boundary. |
None
|
Returns:
| Type | Description |
|---|---|
list[MessageWithParts]
|
List of MessageWithParts in chronological order. |
get_messages_with_parts_by_ids
async
¶
Fetch a specific set of messages (and their parts) by ID.
Returns only the requested messages in the same order as message_ids,
making context assembly O(k) in the number of context items rather than
O(n) in the total session length.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_ids
|
list[str]
|
Ordered list of message IDs to fetch. |
required |
Returns:
| Type | Description |
|---|---|
list[MessageWithParts]
|
List of MessageWithParts in the order of |
get_last_summary_message
async
¶
Return the most recent message with is_summary=True, or None.
Uses the partial index on (session_id, is_summary) for O(log n) lookup.
store_file_reference
async
¶
Insert or replace a file reference (upsert by content_id).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ref
|
FileReference
|
The FileReference to persist. |
required |
Returns:
| Type | Description |
|---|---|
FileReference
|
The stored reference. |
get_file_reference
async
¶
Fetch a file reference by content hash. Returns None if not found.
get_file_reference_by_path
async
¶
Fetch the most recently stored file reference for a given path.
get_context_items
async
¶
Return the ordered context items for a session.
Each item is a (item_type, item_id) pair in ascending position
order — the canonical sequence that ContextBuilder assembles into
an LLM message list.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
str
|
The session to query. |
required |
Returns:
| Type | Description |
|---|---|
list[tuple[str, str]]
|
List of |
swap_context_items
async
¶
Atomically replace a set of context items with a single summary row.
This is the core of the O(1) compaction commit: within a single
transaction, all rows whose item_id is in remove_item_ids are
deleted and one new 'summary' row is inserted at the minimum
position of the removed items (i.e. it occupies the slot of the first
item being removed); items outside the compacted span keep their
original positions — no shifting required.
Used in two scenarios:
- Leaf summarisation:
remove_item_idscontains the message IDs that were compacted (item_type='message'). - Condensation:
remove_item_idscontains the parent summary node IDs (item_type='summary') being merged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
str
|
The session being compacted. |
required |
remove_item_ids
|
list[str]
|
Context item IDs to remove (any item_type). |
required |
summary_id
|
str
|
ID of the SummaryNode/condensed node that replaces them. |
required |
MessageNotFoundError
¶
MnesisStoreError
¶
Bases: Exception
Base class for store errors.
PartNotFoundError
¶
RawMessagePart
¶
RawMessagePart(
id: str,
message_id: str,
session_id: str,
part_type: str,
content: str,
part_index: int = 0,
tool_name: str | None = None,
tool_call_id: str | None = None,
tool_state: str | None = None,
compacted_at: int | None = None,
started_at: int | None = None,
completed_at: int | None = None,
token_estimate: int = 0,
)
Internal storage model for a message part row.
Session
¶
Session(
id: str,
parent_id: str | None,
created_at: int,
updated_at: int,
model_id: str,
provider_id: str,
agent: str,
title: str | None,
is_active: bool,
metadata: dict[str, Any] | None,
system_prompt: str = "",
)
Thin data class for session rows (not Pydantic — avoids heavy validation on reads).
SessionNotFoundError
¶
StorePool
¶
Process-scoped registry of open aiosqlite.Connection objects.
Thread-safety: only safe to use from a single asyncio event loop — do not
share a StorePool across threads.
For each unique resolved database path the pool holds exactly one
connection. Callers may call acquire() concurrently; only the first
caller opens the connection, subsequent callers receive the same object.
The pool also manages a per-path asyncio.Lock that
ImmutableStore uses to serialise write transactions. SQLite in WAL
mode allows unlimited concurrent readers but only one writer; the lock
prevents competing writes from getting database is locked errors on
busy concurrent workloads.
acquire
async
¶
acquire(
db_path: str,
*,
wal_mode: bool = True,
connection_timeout: float = 30.0,
) -> aiosqlite.Connection
Return the shared connection for db_path, opening it if needed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db_path
|
str
|
Resolved (~ expanded) absolute path to the database file. |
required |
wal_mode
|
bool
|
Enable WAL journal mode on first open. |
True
|
connection_timeout
|
float
|
SQLite busy timeout in seconds. |
30.0
|
Returns:
| Type | Description |
|---|---|
Connection
|
The shared |
write_lock
¶
Return the write-serialisation lock for db_path.
The lock must already exist (i.e. acquire() must have been called
for this path). Raises KeyError if called before acquire().
close_path
async
¶
Close and remove the connection for a single path.
default
staticmethod
¶
Return the process-level default pool.
The default pool is created lazily on first access and lives for the
lifetime of the process. It is suitable for production use; tests
should create their own StorePool() instances to get full isolation.
SummaryDAGStore
¶
Manages the logical DAG of summary nodes.
Summary nodes are persisted to two places:
- The
messagestable (is_summary=True) — read byContextBuilderto inject summaries into the active context window. - The
summary_nodestable — stores the full DAG:kind,parent_node_ids(JSON array), andsupersededflag. This is the source of truth for DAG relationships and survives session restarts.
Superseded nodes are tracked in an in-memory set (_superseded_ids) for
fast within-session filtering, and persisted to summary_nodes.superseded
so the state is recoverable after a process restart.
mark_superseded
async
¶
Mark one or more summary nodes as superseded by a condensed node.
Updates both the in-memory set (for fast within-session filtering) and
the summary_nodes table (so state survives a restart).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_ids
|
list[str]
|
IDs of the summary nodes consumed by a condensation. |
required |
get_active_nodes
async
¶
Return all active (non-superseded) summary nodes for a session.
Queries summary_nodes WHERE superseded=0 to recover DAG state after
a restart, then populates each node's content from the corresponding
message parts. Also applies the in-memory superseded set for nodes
marked superseded within the current session but not yet flushed (should
not happen in practice, but guards against concurrent writes).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
str
|
The session to query. |
required |
Returns:
| Type | Description |
|---|---|
list[SummaryNode]
|
List of SummaryNode objects in creation order (created_at ASC). |
get_latest_node
async
¶
Return the most recently created active summary node, or None.
Reads from summary_nodes to respect DAG supersession correctly,
including across restarts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
str
|
The session to query. |
required |
Returns:
| Type | Description |
|---|---|
SummaryNode | None
|
The latest active SummaryNode, or None. |
get_coverage_gaps
async
¶
Return spans of messages not covered by any summary node.
In Phase 1, there is at most one summary node (the most recent), so the only gap is the tail of messages after that node.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
str
|
The session to query. |
required |
Returns:
| Type | Description |
|---|---|
list[MessageSpan]
|
List of MessageSpan objects representing uncovered message ranges. |
insert_node
async
¶
Persist a new summary node.
Writes:
- A Message with is_summary=True (read by ContextBuilder).
- A TextPart with the summary content.
- A CompactionMarkerPart with metadata.
- A row in summary_nodes with kind, parent_node_ids (JSON),
and superseded=0 (the DAG persistence added in Phase 3).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node
|
SummaryNode
|
The SummaryNode to persist. |
required |
id_generator
|
Callable[[], str]
|
Callable that returns new unique part IDs. |
required |
Returns:
| Type | Description |
|---|---|
SummaryNode
|
The persisted node (unmodified). |
get_node_by_id
async
¶
Fetch a specific summary node by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_id
|
str
|
The message ID of the summary node. |
required |
Returns:
| Type | Description |
|---|---|
SummaryNode | None
|
The SummaryNode, or None if not found or not a summary. |