Skip to main content

Install

pip install -e "path/to/manta"
Requires Python 3.12+.

Environment variables

Set the following to connect to ClickHouse:
export CLICKHOUSE_URL="https://your-clickhouse-host:8443"
export CLICKHOUSE_USER="default"
export CLICKHOUSE_PASSWORD="..."
export CLICKHOUSE_DATABASE="default"

Load traces

Use MantaService to query traces for a run:
from manta.data import MantaService

service = MantaService()

traces = service.load_traces(run_id="run_xxx")
print(f"Loaded {len(traces)} traces")
Filter by phase, step, or specific trace IDs:
eval_traces = service.load_traces(run_id="run_xxx", phase="eval")
step_traces = service.load_traces(run_id="run_xxx", step_numbers=[0, 1])
specific = service.load_traces(run_id="run_xxx", trace_ids=["trc_abc", "trc_def"])
Each trace is a TraceRow with summary fields like score, status, cost_usd, duration_ms, and turn_count.

Load spans

Load spans for your traces to access messages, tool calls, and grader results:
trace_ids = [t.trace_id for t in traces]
spans_by_trace = service.load_spans_for_traces(trace_ids)
Filter spans by kind to load only what you need:
spans_by_trace = service.load_spans_for_traces(
    trace_ids,
    span_kinds=["message", "tool"],
)
load_spans_for_traces() returns a dict[str, list[SpanRow]] keyed by trace ID.

Use TraceContext

Wrap a trace and its spans in a TraceContext for typed access:
from manta import TraceContext

for trace in traces:
    ctx = TraceContext(trace, span_loader=lambda tid: spans_by_trace.get(tid, []))

    print(f"Trace {ctx.trace_id}: score={ctx.trace.score}")

    for msg in ctx.messages(role="assistant"):
        print(f"  Assistant: {msg.content[:100]}")

    for tool in ctx.tools():
        print(f"  Tool: {tool.name}, error={tool.error}")

    grader = ctx.grader_result()
    if grader:
        print(f"  Grader: score={grader.score}")
        for criterion in grader.criteria:
            print(f"    {criterion.criterion_name}: passed={criterion.passed}")

Build a batch analysis

For running an analysis function across every trace in a run, use MapReduceComputation. Define an output schema, a map function, and optionally a reduce function:
from manta import (
    EntityLevel, MapReduceComputation,
    MetadataOutput, ReduceInput, TraceContext,
)

class TraceMetrics(MetadataOutput):
    tool_count: int
    passed: bool
    score: float

class DatapointMetrics(MetadataOutput):
    avg_tool_count: float
    pass_rate: float

def map_trace(ctx: TraceContext) -> TraceMetrics:
    grader = ctx.grader_result()
    return TraceMetrics(
        tool_count=len(ctx.tools()),
        passed=grader.score == 1 if grader else False,
        score=ctx.trace.score or 0.0,
    )

def reduce_datapoint(inp: ReduceInput[TraceMetrics]) -> DatapointMetrics:
    return DatapointMetrics(
        avg_tool_count=inp.avg("tool_count"),
        pass_rate=inp.count_where(lambda o: o.passed) / len(inp),
    )

pipeline = (
    MapReduceComputation("My analysis", project_id="prj_xxx")
    .add_level(EntityLevel.TRACE, map_trace, TraceMetrics)
    .add_level(EntityLevel.DATAPOINT, reduce_datapoint, DatapointMetrics)
)

result = pipeline.run(run_id="run_xxx", mode="local", limit_traces=10)
See output schemas and execution for more on pipelines.