Skip to content

mnesis.operators.llm_map

llm_map

LLMMap operator — stateless parallel LLM processing.

MapResult

Bases: BaseModel

The result of processing a single item through LLMMap.

MapBatch

Bases: BaseModel

Aggregate result of a full LLMMap.run_all() call.

LLMMap

LLMMap(
    config: OperatorConfig,
    estimator: TokenEstimator | None = None,
    event_bus: EventBus | None = None,
    model: str | None = None,
)

Stateless parallel LLM processing over a list of items.

Each item is processed in an independent, single-turn LLM call with no session state. The parent session context is not consumed or affected. Results stream back via async for as they complete.

Example::

class ExtractedData(BaseModel):
    title: str
    summary: str
    keywords: list[str]

llm_map = LLMMap(config.operators, model="anthropic/claude-haiku-3-5")
async for result in llm_map.run(
    inputs=documents,
    prompt_template="Extract metadata from this document:\n\n{{ item }}",
    output_schema=ExtractedData,
):
    if result.success:
        print(result.output)

run async

run(
    inputs: list[Any],
    prompt_template: str,
    output_schema: dict[str, Any] | type[BaseModel],
    *,
    model: str | None = None,
    concurrency: int | None = None,
    max_retries: int | None = None,
    system_prompt: str | None = None,
    temperature: float = 0.0,
    timeout_secs: float = 60.0,
    retry_guidance: str = _DEFAULT_RETRY_GUIDANCE,
) -> AsyncGenerator[MapResult, None]

Process inputs in parallel with the given prompt template.

This is an async generator — iterate with async for, not await.

Parameters:

Name Type Description Default
inputs list[Any]

List of items to process. Each is rendered into {{ item }}.

required
prompt_template str

Jinja2 template string. Must reference item.

required
output_schema dict[str, Any] | type[BaseModel]

Expected output shape. Pass a Pydantic BaseModel subclass or a JSON Schema dict. When a dict is passed, jsonschema must be installed (pip install jsonschema). Responses are validated and retried on failure.

required
model str | None

LLM model string in litellm format (keyword-only). Falls back to the model set on __init__ if not provided here.

None
concurrency int | None

Override config concurrency limit.

None
max_retries int | None

Override config max retries per item.

None
system_prompt str | None

Optional system prompt for all calls.

None
temperature float

Sampling temperature. Use 0 for deterministic extraction.

0.0
timeout_secs float

Per-item timeout in seconds.

60.0
retry_guidance str

Message appended to the prompt on retry after a validation failure. Defaults to a generic JSON correction hint. Override to avoid leaking schema details into the LLM context.

_DEFAULT_RETRY_GUIDANCE

Yields:

Type Description
AsyncGenerator[MapResult, None]

MapResult objects as they complete (not in input order).

AsyncGenerator[MapResult, None]

Use result.input to correlate with the original input.

Raises:

Type Description
ValueError

If neither model nor the constructor model is set, or if prompt_template does not reference item.

TypeError

If output_schema is a type that is not a BaseModel subclass.

ImportError

If output_schema is a dict and jsonschema is not installed.

run_all async

run_all(
    inputs: list[Any],
    prompt_template: str,
    output_schema: dict[str, Any] | type[BaseModel],
    *,
    model: str | None = None,
    concurrency: int | None = None,
    max_retries: int | None = None,
    system_prompt: str | None = None,
    temperature: float = 0.0,
    timeout_secs: float = 60.0,
    retry_guidance: str = _DEFAULT_RETRY_GUIDANCE,
) -> MapBatch

Process all inputs and return an aggregate MapBatch.

Convenience alternative to run() for callers who do not need streaming. Collects all results before returning.

Returns:

Type Description
MapBatch

MapBatch with .successes, .failures, and .total_attempts.