Skip to main content
MapReduceComputation runs an analysis function across every trace in a run, aggregates results up the entity hierarchy, and optionally persists computed metadata to ClickHouse and Kestrel.

Defining a pipeline

from manta import MapReduceComputation, EntityLevel

pipeline = (
    MapReduceComputation("My analysis", project_id="prj_xxx")
    .add_level(EntityLevel.TRACE, map_fn, TraceMetrics)
    .add_level(EntityLevel.DATAPOINT, reduce_fn, DatapointMetrics)
)
Each level has an entity type, a compute function, and an output schema. The first level is always TRACE with a map function. Higher levels use reduce functions. If your map function calls an LLM, set uses_llm=True and list extra dependencies in pip_install:
pipeline = MapReduceComputation(
    "Failure analysis",
    project_id="prj_xxx",
    pip_install=["openai"],
)
pipeline.add_level(EntityLevel.TRACE, map_fn, TraceMetrics, uses_llm=True)

Local execution

Run in-process with a thread pool:
result = pipeline.run(run_id="run_xxx", mode="local")
Test on a subset first:
result = pipeline.run(
    run_id="run_xxx",
    mode="local",
    limit_traces=10,
    persist_results=False,
)
ParameterWhat it limits
limit_tracesFirst N traces
limit_datapointsTraces from the first N datapoints
limit_stepsTraces from the first N steps
Only one limit can be set at a time. Subset mode automatically disables writes. The default mode runs on Modal for parallel cloud execution:
result = pipeline.run(run_id="run_xxx", mode="modal")
Modal execution handles batching, checkpointing, and distribution across cloud workers. Interrupted runs resume automatically when called again with the same run_id.

Persistence

With persist_results=True (the default), Manta:
  1. Registers a judge and columns with Kestrel
  2. Writes computed metadata to ClickHouse’s entity_metadata table
  3. Tracks progress for resumability
Set persist_results=False for in-memory results only.

Pipeline result

pipeline.run() returns a dict[EntityLevel, list[MetadataOutput]]:
result = pipeline.run(run_id="run_xxx")

for metric in result[EntityLevel.TRACE]:
    print(metric)

Environment variables

VariableRequiredDescription
CLICKHOUSE_URLYesClickHouse HTTP endpoint
CLICKHOUSE_USERYesClickHouse user
CLICKHOUSE_PASSWORDYesClickHouse password
KESTREL_API_URLFor persistenceKestrel API URL
KESTREL_API_KEYFor persistenceKestrel API key
MODAL_TOKEN_IDFor ModalModal authentication
MODAL_TOKEN_SECRETFor ModalModal authentication