Skip to main content
When using MapReduce pipelines, you define output schemas to specify the columns each level produces. Subclass MetadataOutput and declare fields with type annotations.

Basic schema

from manta import MetadataOutput

class TraceMetrics(MetadataOutput):
    tool_count: int
    passed: bool
    score: float
    summary: str
Supported types:
Python typeColumn type
intint
floatfloat
boolbool
strstring
dictjson
list[str]enum_set

Annotated column types

Use Annotated with markers for special behaviors:
from typing import Annotated
from manta import MetadataOutput, Enum, JsonColumn, WithReasoning

class TraceMetrics(MetadataOutput):
    category: Annotated[str, Enum("good", "fair", "poor")]
    tags: Annotated[list[str], Enum("bug", "feature", "perf")]
    details: Annotated[dict, JsonColumn()]
    score: Annotated[float, WithReasoning()]
  • Enum(...) — restrict a string to allowed values
  • JsonColumn() — store complex nested data
  • WithReasoning() — attach reasoning to a column
Set reasoning in your compute function:
output = TraceMetrics(score=0.8, category="good", ...)
output.set_reasoning("score", "High tool success rate")

ReduceInput

Reduce functions receive a ReduceInput[T] wrapping the child outputs. It behaves like a list with aggregation helpers:
MethodReturn typeDescription
avg(field)floatAverage of a numeric field
sum(field)floatSum of a numeric field
count_where(predicate)intCount matching outputs
filter(predicate)list[T]Filter outputs by predicate
get_values(field)listExtract a field from all outputs
from manta import ReduceInput

def reduce_fn(inp: ReduceInput[TraceMetrics]) -> DatapointMetrics:
    return DatapointMetrics(
        avg_tool_count=inp.avg("tool_count"),
        pass_rate=inp.count_where(lambda o: o.passed) / len(inp),
    )