mnesis.store.immutable¶
immutable
¶
Append-only SQLite-backed message store.
MnesisStoreError
¶
Bases: Exception
Base class for store errors.
SessionNotFoundError
¶
MessageNotFoundError
¶
PartNotFoundError
¶
DuplicateIDError
¶
ImmutableFieldError
¶
Bases: MnesisStoreError
Raised when attempting to modify an immutable field.
Session
¶
Session(
id: str,
parent_id: str | None,
created_at: int,
updated_at: int,
model_id: str,
provider_id: str,
agent: str,
title: str | None,
is_active: bool,
metadata: dict[str, Any] | None,
system_prompt: str = "",
)
Thin data class for session rows (not Pydantic — avoids heavy validation on reads).
RawMessagePart
¶
RawMessagePart(
id: str,
message_id: str,
session_id: str,
part_type: str,
content: str,
part_index: int = 0,
tool_name: str | None = None,
tool_call_id: str | None = None,
tool_state: str | None = None,
compacted_at: int | None = None,
started_at: int | None = None,
completed_at: int | None = None,
token_estimate: int = 0,
)
Internal storage model for a message part row.
ImmutableStore
¶
Append-only, SQLite-backed message log.
All writes use transactions. Part content is immutable — only status
metadata (tool_state, compacted_at, timing, output) can be updated
via update_part_status().
When a StorePool is supplied the store borrows a shared connection
from it — close() releases any local-only connection but leaves
pooled connections open (the pool owns their lifetime). This lets many
concurrent MnesisSession objects share one physical SQLite connection
without database is locked errors, and maps cleanly to a PostgreSQL
connection-pool in the future.
Usage (standalone)::
store = ImmutableStore(StoreConfig())
await store.initialize()
try:
await store.create_session(session)
await store.append_message(msg)
finally:
await store.close() # closes the private connection
Usage (with pool)::
pool = StorePool()
store_a = ImmutableStore(config, pool=pool)
store_b = ImmutableStore(config, pool=pool)
await store_a.initialize() # opens the shared connection once
await store_b.initialize() # reuses it
await store_a.close() # no-op — pool owns the connection
await store_b.close() # no-op
await pool.close_all() # actually closes the connection
initialize
async
¶
Open (or borrow) a database connection and apply the schema.
When a pool is set, the connection is acquired from the pool and the
schema is applied idempotently (CREATE TABLE IF NOT EXISTS).
When no pool is set, a private connection is opened; the caller is
responsible for calling close() to release it.
Raises:
| Type | Description |
|---|---|
Error
|
If the database cannot be opened or the schema fails. |
close
async
¶
Release the database connection.
If the connection is owned by a pool this is a no-op — the pool
manages the connection lifetime. If the connection is private
(no pool was supplied) it is closed and set to None.
create_session
async
¶
create_session(
id: str,
*,
model_id: str = "",
provider_id: str = "",
agent: str = "default",
parent_id: str | None = None,
title: str | None = None,
metadata: dict[str, Any] | None = None,
system_prompt: str = "",
) -> Session
Insert a new session row.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
ULID-based session ID (e.g. |
required |
model_id
|
str
|
The model string used by this session. |
''
|
provider_id
|
str
|
The provider (e.g. |
''
|
agent
|
str
|
Agent role name (default |
'default'
|
parent_id
|
str | None
|
Optional parent session ID for sub-sessions. |
None
|
title
|
str | None
|
Optional human-readable title. |
None
|
metadata
|
dict[str, Any] | None
|
Optional JSON-serializable metadata dict. |
None
|
system_prompt
|
str
|
System prompt for this session. Persisted so that
|
''
|
Returns:
| Type | Description |
|---|---|
Session
|
The created Session. |
Raises:
| Type | Description |
|---|---|
DuplicateIDError
|
If a session with this ID already exists. |
get_session
async
¶
Fetch a session by ID.
Raises:
| Type | Description |
|---|---|
SessionNotFoundError
|
If no session with this ID exists. |
list_sessions
async
¶
list_sessions(
*,
parent_id: str | None = None,
active_only: bool = True,
limit: int = 100,
offset: int = 0,
) -> list[Session]
List sessions, newest first.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parent_id
|
str | None
|
Filter to sessions with this parent (sub-sessions). |
None
|
active_only
|
bool
|
Exclude soft-deleted sessions when True. |
True
|
limit
|
int
|
Maximum number of sessions to return. |
100
|
offset
|
int
|
Number of sessions to skip (for pagination). |
0
|
Returns:
| Type | Description |
|---|---|
list[Session]
|
List of Session objects ordered by created_at DESC. |
soft_delete_session
async
¶
Mark a session as inactive (is_active=0). Messages and parts are retained.
append_message
async
¶
Append a message to the log.
For non-summary messages a context_items row is inserted atomically
in the same transaction so that the context-assembly view stays
consistent with the message log. Summary messages are NOT inserted
here — the compaction engine inserts their context_items row
separately as part of the atomic context swap.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Message
|
The Message to persist. Must have a pre-generated id. |
required |
Returns:
| Type | Description |
|---|---|
Message
|
The stored message (unmodified). |
Raises:
| Type | Description |
|---|---|
SessionNotFoundError
|
If session_id does not exist. |
DuplicateIDError
|
If a message with this ID already exists. |
append_part
async
¶
Append a single part to a message.
Assigns part_index as max(existing_parts) + 1 within a transaction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
part
|
RawMessagePart
|
The RawMessagePart to persist. |
required |
Returns:
| Type | Description |
|---|---|
RawMessagePart
|
The stored part with its assigned part_index. |
Raises:
| Type | Description |
|---|---|
MessageNotFoundError
|
If message_id does not exist. |
update_part_status
async
¶
update_part_status(
part_id: str,
*,
tool_state: str | None = None,
compacted_at: int | None = None,
started_at: int | None = None,
completed_at: int | None = None,
output: str | None = None,
error_message: str | None = None,
) -> None
Update mutable status fields on a part.
This is the only permitted mutation of persisted data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
part_id
|
str
|
The part to update. |
required |
tool_state
|
str | None
|
New tool lifecycle state. |
None
|
compacted_at
|
int | None
|
Unix ms timestamp — sets the pruning tombstone. |
None
|
started_at
|
int | None
|
Tool execution start timestamp. |
None
|
completed_at
|
int | None
|
Tool execution end timestamp. |
None
|
output
|
str | None
|
Tool result output string (merged into content JSON). |
None
|
error_message
|
str | None
|
Tool error message (merged into content JSON). |
None
|
Raises:
| Type | Description |
|---|---|
PartNotFoundError
|
If the part does not exist. |
update_message_tokens
async
¶
update_message_tokens(
message_id: str,
tokens: TokenUsage,
cost: float,
finish_reason: str,
) -> None
Update token usage fields on an assistant message after streaming completes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_id
|
str
|
The message to update. |
required |
tokens
|
TokenUsage
|
Final token usage from the provider response. |
required |
cost
|
float
|
Dollar cost of the response. |
required |
finish_reason
|
str
|
Provider finish reason (e.g. |
required |
get_message
async
¶
Fetch a single message by ID.
Raises:
| Type | Description |
|---|---|
MessageNotFoundError
|
If no message with this ID exists. |
get_messages
async
¶
Fetch all messages for a session in chronological order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
str
|
The session to query. |
required |
since_message_id
|
str | None
|
If provided, returns only messages created after the message with this ID. |
None
|
Returns:
| Type | Description |
|---|---|
list[Message]
|
List of Message objects ordered by created_at ASC. |
get_parts
async
¶
Fetch all parts for a message, ordered by part_index ASC.
get_messages_with_parts
async
¶
get_messages_with_parts(
session_id: str, *, since_message_id: str | None = None
) -> list[MessageWithParts]
Efficiently fetch messages and their parts in two queries (no N+1).
Algorithm: 1. Fetch all matching messages. 2. Fetch all parts for those message IDs in a single IN query. 3. Group parts by message ID and join in Python. 4. Deserialize part JSON into typed MessagePart objects.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
str
|
The session to query. |
required |
since_message_id
|
str | None
|
If provided, returns only messages after this boundary. |
None
|
Returns:
| Type | Description |
|---|---|
list[MessageWithParts]
|
List of MessageWithParts in chronological order. |
get_messages_with_parts_by_ids
async
¶
Fetch a specific set of messages (and their parts) by ID.
Returns only the requested messages in the same order as message_ids,
making context assembly O(k) in the number of context items rather than
O(n) in the total session length.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_ids
|
list[str]
|
Ordered list of message IDs to fetch. |
required |
Returns:
| Type | Description |
|---|---|
list[MessageWithParts]
|
List of MessageWithParts in the order of |
get_last_summary_message
async
¶
Return the most recent message with is_summary=True, or None.
Uses the partial index on (session_id, is_summary) for O(log n) lookup.
store_file_reference
async
¶
Insert or replace a file reference (upsert by content_id).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ref
|
FileReference
|
The FileReference to persist. |
required |
Returns:
| Type | Description |
|---|---|
FileReference
|
The stored reference. |
get_file_reference
async
¶
Fetch a file reference by content hash. Returns None if not found.
get_file_reference_by_path
async
¶
Fetch the most recently stored file reference for a given path.
get_context_items
async
¶
Return the ordered context items for a session.
Each item is a (item_type, item_id) pair in ascending position
order — the canonical sequence that ContextBuilder assembles into
an LLM message list.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
str
|
The session to query. |
required |
Returns:
| Type | Description |
|---|---|
list[tuple[str, str]]
|
List of |
swap_context_items
async
¶
Atomically replace a set of context items with a single summary row.
This is the core of the O(1) compaction commit: within a single
transaction, all rows whose item_id is in remove_item_ids are
deleted and one new 'summary' row is inserted at the minimum
position of the removed items (i.e. it occupies the slot of the first
item being removed); items outside the compacted span keep their
original positions — no shifting required.
Used in two scenarios:
- Leaf summarisation:
remove_item_idscontains the message IDs that were compacted (item_type='message'). - Condensation:
remove_item_idscontains the parent summary node IDs (item_type='summary') being merged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id
|
str
|
The session being compacted. |
required |
remove_item_ids
|
list[str]
|
Context item IDs to remove (any item_type). |
required |
summary_id
|
str
|
ID of the SummaryNode/condensed node that replaces them. |
required |