Skip to content

mnesis.operators

operators

Mnesis parallel operator primitives.

AgenticMap

AgenticMap(
    config: OperatorConfig,
    event_bus: EventBus | None = None,
    model: str | None = None,
)

Spawn independent sub-agent sessions per input item.

Each item gets a full MnesisSession with multi-turn reasoning, tool access, and automatic context management. Sub-sessions are isolated — they do not inherit the parent's conversation history.

The parent session pays only O(1) context cost per item: it sees only the final output text, regardless of how many turns the sub-session needed.

Permission restrictions (always applied): - Sub-sessions cannot spawn further sub-agents. - read_only=True is not yet implemented; passing it raises NotImplementedError.

Example::

agentic_map = AgenticMap(config.operators, model="anthropic/claude-opus-4-6")
async for result in agentic_map.run(
    inputs=repositories,
    agent_prompt_template="Analyze this repository and report quality issues:\n{{ item }}",
    read_only=False,
    max_turns=20,
):
    print(f"Repo: {result.input}\nFindings: {result.output_text[:200]}")

run async

run(
    inputs: list[Any],
    agent_prompt_template: str,
    *,
    model: str | None = None,
    concurrency: int | None = None,
    read_only: bool = True,
    tools: list[Any] | None = None,
    max_turns: int = 20,
    continuation_message: str = "",
    parent_session_id: str | None = None,
    agent: str = "general",
    lcm_config: MnesisConfig | None = None,
    db_path: str | None = None,
    pool: StorePool | None = None,
) -> AsyncGenerator[AgentMapResult, None]

Spawn sub-agent sessions in parallel, one per input item.

This is an async generator — iterate with async for, not await.

Parameters:

Name Type Description Default
inputs list[Any]

List of items. Each is rendered into {{ item }}.

required
agent_prompt_template str

Jinja2 template for the initial user message. Must reference the item variable.

required
model str | None

LLM model string for all sub-sessions. Falls back to the model set on __init__ if not provided here.

None
concurrency int | None

Maximum concurrent sub-sessions.

None
read_only bool

Reserved. True raises NotImplementedError until enforcement is implemented.

True
tools list[Any] | None

Optional tool definitions for sub-sessions.

None
max_turns int

Maximum turns per sub-session before stopping.

20
continuation_message str

Message sent as the user turn after turn 0. If an empty string (default), the sub-session runs only the initial turn and then stops, regardless of max_turns.

''
are not needed for typical use
required
parent_session_id str | None

Optional parent session ID for lineage tracking.

None
agent str

Agent role for sub-sessions.

'general'
lcm_config MnesisConfig | None

Mnesis config for sub-sessions. Defaults to MnesisConfig().

None
db_path str | None

Override database path (useful for testing).

None
pool StorePool | None

Shared StorePool so all sub-sessions use one connection. If omitted a fresh pool is created and closed automatically at the end of this call.

None

Yields:

Type Description
AsyncGenerator[AgentMapResult, None]

AgentMapResult objects as sub-sessions complete (not in input order).

Raises:

Type Description
NotImplementedError

If read_only=True is passed (not yet implemented).

ValueError

If neither model nor the constructor model is set, or if agent_prompt_template does not reference item.

run_all async

run_all(
    inputs: list[Any],
    agent_prompt_template: str,
    *,
    model: str | None = None,
    concurrency: int | None = None,
    read_only: bool = True,
    tools: list[Any] | None = None,
    max_turns: int = 20,
    continuation_message: str = "",
    parent_session_id: str | None = None,
    agent: str = "general",
    lcm_config: MnesisConfig | None = None,
    db_path: str | None = None,
    pool: StorePool | None = None,
) -> AgentMapBatch

Spawn all sub-agents and return an aggregate AgentMapBatch.

Convenience alternative to run() for callers who do not need streaming. Collects all results before returning.

Returns:

Type Description
AgentMapBatch

AgentMapBatch with .successes, .failures, and .total_attempts.

AgentMapBatch

Bases: BaseModel

Aggregate result of a full AgenticMap.run_all() call.

AgentMapResult

Bases: BaseModel

The result of running a single sub-agent through AgenticMap.

LLMMap

LLMMap(
    config: OperatorConfig,
    estimator: TokenEstimator | None = None,
    event_bus: EventBus | None = None,
    model: str | None = None,
)

Stateless parallel LLM processing over a list of items.

Each item is processed in an independent, single-turn LLM call with no session state. The parent session context is not consumed or affected. Results stream back via async for as they complete.

Example::

class ExtractedData(BaseModel):
    title: str
    summary: str
    keywords: list[str]

llm_map = LLMMap(config.operators, model="anthropic/claude-haiku-3-5")
async for result in llm_map.run(
    inputs=documents,
    prompt_template="Extract metadata from this document:\n\n{{ item }}",
    output_schema=ExtractedData,
):
    if result.success:
        print(result.output)

run async

run(
    inputs: list[Any],
    prompt_template: str,
    output_schema: dict[str, Any] | type[BaseModel],
    *,
    model: str | None = None,
    concurrency: int | None = None,
    max_retries: int | None = None,
    system_prompt: str | None = None,
    temperature: float = 0.0,
    timeout_secs: float = 60.0,
    retry_guidance: str = _DEFAULT_RETRY_GUIDANCE,
) -> AsyncGenerator[MapResult, None]

Process inputs in parallel with the given prompt template.

This is an async generator — iterate with async for, not await.

Parameters:

Name Type Description Default
inputs list[Any]

List of items to process. Each is rendered into {{ item }}.

required
prompt_template str

Jinja2 template string. Must reference item.

required
output_schema dict[str, Any] | type[BaseModel]

Expected output shape. Pass a Pydantic BaseModel subclass or a JSON Schema dict. When a dict is passed, jsonschema must be installed (pip install jsonschema). Responses are validated and retried on failure.

required
model str | None

LLM model string in litellm format (keyword-only). Falls back to the model set on __init__ if not provided here.

None
concurrency int | None

Override config concurrency limit.

None
max_retries int | None

Override config max retries per item.

None
system_prompt str | None

Optional system prompt for all calls.

None
temperature float

Sampling temperature. Use 0 for deterministic extraction.

0.0
timeout_secs float

Per-item timeout in seconds.

60.0
retry_guidance str

Message appended to the prompt on retry after a validation failure. Defaults to a generic JSON correction hint. Override to avoid leaking schema details into the LLM context.

_DEFAULT_RETRY_GUIDANCE

Yields:

Type Description
AsyncGenerator[MapResult, None]

MapResult objects as they complete (not in input order).

AsyncGenerator[MapResult, None]

Use result.input to correlate with the original input.

Raises:

Type Description
ValueError

If neither model nor the constructor model is set, or if prompt_template does not reference item.

TypeError

If output_schema is a type that is not a BaseModel subclass.

ImportError

If output_schema is a dict and jsonschema is not installed.

run_all async

run_all(
    inputs: list[Any],
    prompt_template: str,
    output_schema: dict[str, Any] | type[BaseModel],
    *,
    model: str | None = None,
    concurrency: int | None = None,
    max_retries: int | None = None,
    system_prompt: str | None = None,
    temperature: float = 0.0,
    timeout_secs: float = 60.0,
    retry_guidance: str = _DEFAULT_RETRY_GUIDANCE,
) -> MapBatch

Process all inputs and return an aggregate MapBatch.

Convenience alternative to run() for callers who do not need streaming. Collects all results before returning.

Returns:

Type Description
MapBatch

MapBatch with .successes, .failures, and .total_attempts.

MapBatch

Bases: BaseModel

Aggregate result of a full LLMMap.run_all() call.

MapResult

Bases: BaseModel

The result of processing a single item through LLMMap.