mnesis.operators¶
operators
¶
Mnesis parallel operator primitives.
AgenticMap
¶
Spawn independent sub-agent sessions per input item.
Each item gets a full MnesisSession with multi-turn reasoning, tool access,
and automatic context management. Sub-sessions are isolated — they do not
inherit the parent's conversation history.
The parent session pays only O(1) context cost per item: it sees only the final output text, regardless of how many turns the sub-session needed.
Permission restrictions (always applied):
- Sub-sessions cannot spawn further sub-agents.
- read_only=True is not yet implemented; passing it raises NotImplementedError.
Example::
agentic_map = AgenticMap(config.operators, model="anthropic/claude-opus-4-6")
async for result in agentic_map.run(
inputs=repositories,
agent_prompt_template="Analyze this repository and report quality issues:\n{{ item }}",
read_only=False,
max_turns=20,
):
print(f"Repo: {result.input}\nFindings: {result.output_text[:200]}")
run
async
¶
run(
inputs: list[Any],
agent_prompt_template: str,
*,
model: str | None = None,
concurrency: int | None = None,
read_only: bool = True,
tools: list[Any] | None = None,
max_turns: int = 20,
continuation_message: str = "",
parent_session_id: str | None = None,
agent: str = "general",
lcm_config: MnesisConfig | None = None,
db_path: str | None = None,
pool: StorePool | None = None,
) -> AsyncGenerator[AgentMapResult, None]
Spawn sub-agent sessions in parallel, one per input item.
This is an async generator — iterate with async for, not await.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
list[Any]
|
List of items. Each is rendered into |
required |
agent_prompt_template
|
str
|
Jinja2 template for the initial user message.
Must reference the |
required |
model
|
str | None
|
LLM model string for all sub-sessions. Falls back to the model
set on |
None
|
concurrency
|
int | None
|
Maximum concurrent sub-sessions. |
None
|
read_only
|
bool
|
Reserved. |
True
|
tools
|
list[Any] | None
|
Optional tool definitions for sub-sessions. |
None
|
max_turns
|
int
|
Maximum turns per sub-session before stopping. |
20
|
continuation_message
|
str
|
Message sent as the user turn after turn 0.
If an empty string (default), the sub-session runs only the
initial turn and then stops, regardless of |
''
|
are not needed for typical use
|
|
required | |
parent_session_id
|
str | None
|
Optional parent session ID for lineage tracking. |
None
|
agent
|
str
|
Agent role for sub-sessions. |
'general'
|
lcm_config
|
MnesisConfig | None
|
Mnesis config for sub-sessions. Defaults to |
None
|
db_path
|
str | None
|
Override database path (useful for testing). |
None
|
pool
|
StorePool | None
|
Shared |
None
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[AgentMapResult, None]
|
AgentMapResult objects as sub-sessions complete (not in input order). |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If |
ValueError
|
If neither |
run_all
async
¶
run_all(
inputs: list[Any],
agent_prompt_template: str,
*,
model: str | None = None,
concurrency: int | None = None,
read_only: bool = True,
tools: list[Any] | None = None,
max_turns: int = 20,
continuation_message: str = "",
parent_session_id: str | None = None,
agent: str = "general",
lcm_config: MnesisConfig | None = None,
db_path: str | None = None,
pool: StorePool | None = None,
) -> AgentMapBatch
Spawn all sub-agents and return an aggregate AgentMapBatch.
Convenience alternative to run() for callers who do not need streaming.
Collects all results before returning.
Returns:
| Type | Description |
|---|---|
AgentMapBatch
|
AgentMapBatch with |
AgentMapBatch
¶
Bases: BaseModel
Aggregate result of a full AgenticMap.run_all() call.
AgentMapResult
¶
Bases: BaseModel
The result of running a single sub-agent through AgenticMap.
LLMMap
¶
LLMMap(
config: OperatorConfig,
estimator: TokenEstimator | None = None,
event_bus: EventBus | None = None,
model: str | None = None,
)
Stateless parallel LLM processing over a list of items.
Each item is processed in an independent, single-turn LLM call with no
session state. The parent session context is not consumed or affected.
Results stream back via async for as they complete.
Example::
class ExtractedData(BaseModel):
title: str
summary: str
keywords: list[str]
llm_map = LLMMap(config.operators, model="anthropic/claude-haiku-3-5")
async for result in llm_map.run(
inputs=documents,
prompt_template="Extract metadata from this document:\n\n{{ item }}",
output_schema=ExtractedData,
):
if result.success:
print(result.output)
run
async
¶
run(
inputs: list[Any],
prompt_template: str,
output_schema: dict[str, Any] | type[BaseModel],
*,
model: str | None = None,
concurrency: int | None = None,
max_retries: int | None = None,
system_prompt: str | None = None,
temperature: float = 0.0,
timeout_secs: float = 60.0,
retry_guidance: str = _DEFAULT_RETRY_GUIDANCE,
) -> AsyncGenerator[MapResult, None]
Process inputs in parallel with the given prompt template.
This is an async generator — iterate with async for, not await.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
list[Any]
|
List of items to process. Each is rendered into |
required |
prompt_template
|
str
|
Jinja2 template string. Must reference |
required |
output_schema
|
dict[str, Any] | type[BaseModel]
|
Expected output shape. Pass a Pydantic |
required |
model
|
str | None
|
LLM model string in litellm format (keyword-only). Falls back
to the model set on |
None
|
concurrency
|
int | None
|
Override config concurrency limit. |
None
|
max_retries
|
int | None
|
Override config max retries per item. |
None
|
system_prompt
|
str | None
|
Optional system prompt for all calls. |
None
|
temperature
|
float
|
Sampling temperature. Use 0 for deterministic extraction. |
0.0
|
timeout_secs
|
float
|
Per-item timeout in seconds. |
60.0
|
retry_guidance
|
str
|
Message appended to the prompt on retry after a validation failure. Defaults to a generic JSON correction hint. Override to avoid leaking schema details into the LLM context. |
_DEFAULT_RETRY_GUIDANCE
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[MapResult, None]
|
MapResult objects as they complete (not in input order). |
AsyncGenerator[MapResult, None]
|
Use |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither |
TypeError
|
If |
ImportError
|
If |
run_all
async
¶
run_all(
inputs: list[Any],
prompt_template: str,
output_schema: dict[str, Any] | type[BaseModel],
*,
model: str | None = None,
concurrency: int | None = None,
max_retries: int | None = None,
system_prompt: str | None = None,
temperature: float = 0.0,
timeout_secs: float = 60.0,
retry_guidance: str = _DEFAULT_RETRY_GUIDANCE,
) -> MapBatch
Process all inputs and return an aggregate MapBatch.
Convenience alternative to run() for callers who do not need streaming.
Collects all results before returning.
Returns:
| Type | Description |
|---|---|
MapBatch
|
MapBatch with |
MapBatch
¶
Bases: BaseModel
Aggregate result of a full LLMMap.run_all() call.
MapResult
¶
Bases: BaseModel
The result of processing a single item through LLMMap.