mnesis.operators.llm_map¶
llm_map
¶
LLMMap operator — stateless parallel LLM processing.
MapResult
¶
Bases: BaseModel
The result of processing a single item through LLMMap.
MapBatch
¶
Bases: BaseModel
Aggregate result of a full LLMMap.run_all() call.
LLMMap
¶
LLMMap(
config: OperatorConfig,
estimator: TokenEstimator | None = None,
event_bus: EventBus | None = None,
model: str | None = None,
)
Stateless parallel LLM processing over a list of items.
Each item is processed in an independent, single-turn LLM call with no
session state. The parent session context is not consumed or affected.
Results stream back via async for as they complete.
Example::
class ExtractedData(BaseModel):
title: str
summary: str
keywords: list[str]
llm_map = LLMMap(config.operators, model="anthropic/claude-haiku-3-5")
async for result in llm_map.run(
inputs=documents,
prompt_template="Extract metadata from this document:\n\n{{ item }}",
output_schema=ExtractedData,
):
if result.success:
print(result.output)
run
async
¶
run(
inputs: list[Any],
prompt_template: str,
output_schema: dict[str, Any] | type[BaseModel],
*,
model: str | None = None,
concurrency: int | None = None,
max_retries: int | None = None,
system_prompt: str | None = None,
temperature: float = 0.0,
timeout_secs: float = 60.0,
retry_guidance: str = _DEFAULT_RETRY_GUIDANCE,
) -> AsyncGenerator[MapResult, None]
Process inputs in parallel with the given prompt template.
This is an async generator — iterate with async for, not await.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
list[Any]
|
List of items to process. Each is rendered into |
required |
prompt_template
|
str
|
Jinja2 template string. Must reference |
required |
output_schema
|
dict[str, Any] | type[BaseModel]
|
Expected output shape. Pass a Pydantic |
required |
model
|
str | None
|
LLM model string in litellm format (keyword-only). Falls back
to the model set on |
None
|
concurrency
|
int | None
|
Override config concurrency limit. |
None
|
max_retries
|
int | None
|
Override config max retries per item. |
None
|
system_prompt
|
str | None
|
Optional system prompt for all calls. |
None
|
temperature
|
float
|
Sampling temperature. Use 0 for deterministic extraction. |
0.0
|
timeout_secs
|
float
|
Per-item timeout in seconds. |
60.0
|
retry_guidance
|
str
|
Message appended to the prompt on retry after a validation failure. Defaults to a generic JSON correction hint. Override to avoid leaking schema details into the LLM context. |
_DEFAULT_RETRY_GUIDANCE
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[MapResult, None]
|
MapResult objects as they complete (not in input order). |
AsyncGenerator[MapResult, None]
|
Use |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither |
TypeError
|
If |
ImportError
|
If |
run_all
async
¶
run_all(
inputs: list[Any],
prompt_template: str,
output_schema: dict[str, Any] | type[BaseModel],
*,
model: str | None = None,
concurrency: int | None = None,
max_retries: int | None = None,
system_prompt: str | None = None,
temperature: float = 0.0,
timeout_secs: float = 60.0,
retry_guidance: str = _DEFAULT_RETRY_GUIDANCE,
) -> MapBatch
Process all inputs and return an aggregate MapBatch.
Convenience alternative to run() for callers who do not need streaming.
Collects all results before returning.
Returns:
| Type | Description |
|---|---|
MapBatch
|
MapBatch with |