Chain Modular
Table of Contents
- Overview
- Architecture & Design Philosophy
- Quick Start
- Module: Core
- Module: Chains
- Module: Registry
- Module: Utils
- Operator Reference
- Error Handling & Resilience
- Full Integration Examples
Overview
chain is a production-grade, modular pipeline for composing arbitrary processing steps — LLM calls, data transforms, I/O operations, and more — into structured, observable, and resilient execution graphs.
Core capabilities at a glance:
| Feature | Description |
|---|---|
| Sequential pipelines | SequentialChain or a >> b >> c operator |
| Parallel fan-out | ParallelChain with named or indexed branches |
| Conditional routing | ConditionalChain with runtime key-based branching |
| Pure transforms | TransformChain wraps any sync/async callable |
| Retries & fallbacks | ChainConfig — per-chain retry count, delay, and fallback chain |
| Timeouts | Per-chain execution timeout via ChainConfig.timeout |
| Observability | Structured execution tree via ExecutionTracer / TraceSpan |
| Shared state | ChainContext mutable whiteboard shared across pipeline steps |
| Caching | CachingChain and cached decorator for result memoisation |
| Dynamic registration | ChainRegistry for plugin-style class lookup and instantiation |
| Declarative config | DeclarativeChainBuilder builds pipelines from dicts, JSON, or YAML |
Architecture & Design Philosophy
┌─────────────────────────────────────────────────────────────────────┐
│ chain framework │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ core/ │ │
│ │ BaseChain ← SequentialChain, ParallelChain, │ │
│ │ ConditionalChain, TransformChain, │ │
│ │ CachingChain │ │
│ │ │ │
│ │ ChainConfig ChainContext ChainInput ChainOutput │ │
│ │ ExecutionTracer TraceSpan │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ ┌─────────────────┐ │
│ │ chains/ │ │ registry/ │ │ utils/ │ │
│ │ SequentialChain │ │ ChainRegistry │ │ cached │ │
│ │ ParallelChain │ │ register() │ │ CachingChain │ │
│ │ ConditionalChain│ │ get() / build() │ │ Declarative │ │
│ │ TransformChain │ │ @chain decorator│ │ ChainBuilder │ │
│ └──────────────────┘ └──────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘Key design decisions:
run()is the public API. It handles wrapping, timing, retries, fallbacks, and tracing. Subclasses only implement_aexecute().- Async-first. All execution is rooted in
async def _aexecute(). The synchronousrun()method bridges intoasyncioautomatically. >>operator flattens composition:a >> b >> cproduces a single flatSequentialChain([a, b, c]), not nested chains.- Tracer propagation. Parent chains push their tracer instance to each child so spans nest correctly in a single unified tree.
- Non-breaking partial failures.
ParallelChain(fail_fast=False)collects all results including errors rather than aborting on first failure.
Quick Start
from fennec_community.chain import (
TransformChain, SequentialChain, ParallelChain,
ConditionalChain, ChainConfig, ChainContext,
CachingChain, ChainRegistry, DeclarativeChainBuilder
)
import asyncio
# 1. Simple transform pipeline using >> operator
strip = TransformChain(str.strip, name="strip")
upper = TransformChain(str.upper, name="upper")
pipeline = strip >> upper
result = asyncio.run(pipeline.arun(" hello world "))
print(result) # "HELLO WORLD"
# 2. Parallel fan-out
parallel = ParallelChain({
"stripped": strip,
"uppercased": upper,
})
result = asyncio.run(parallel.arun(" hello "))
print(result) # {"stripped": "hello", "uppercased": " HELLO "}
# 3. Conditional routing
def router(data): return "upper" if data.islower() else "strip"
conditional = ConditionalChain(
condition=router,
branches={"upper": upper, "strip": strip}
)
result = asyncio.run(conditional.arun("hello"))
print(result) # "HELLO"Module: Core
ChainConfig
ChainConfig is a dataclass that centralises per-chain resilience settings: retries, delays, timeouts, fallbacks, and logging verbosity. Every BaseChain instance carries a ChainConfig and consults it during every execution.
from fennec_community.chain import ChainConfig
cfg = ChainConfig(
retries=3,
retry_delay=1.0,
timeout=10.0,
fallback_chain=my_backup_chain,
verbose=True
)Fields
| Field | Type | Default | Description |
|---|---|---|---|
retries |
int |
0 |
Number of additional execution attempts after the first failure. 0 means no retry (one attempt total). Total attempts = retries + 1. |
retry_delay |
float |
0.5 |
Seconds to wait between consecutive retry attempts. Applied via asyncio.sleep. |
timeout |
float | None |
None |
Maximum wall-clock seconds allowed for a single _aexecute() call. None means no timeout is enforced. When exceeded, the chain returns a failed ChainOutput with a descriptive error message. |
fallback_chain |
BaseChain | None |
None |
A chain to invoke if all retries are exhausted and the primary chain still fails. The fallback receives the original ChainInput. If None, the last exception is re-raised to the caller. |
verbose |
bool |
False |
When True, emits DEBUG-level log messages including input, output, and execution time for every successful run. |
Validation: __post_init__ raises ValueError if retries < 0, retry_delay < 0, or timeout <= 0 (when not None).
ChainInput
ChainInput is a typed dataclass that wraps the data flowing into a chain, along with metadata and the shared context bag.
from fennec_community.chain import ChainInput
inp = ChainInput(
data={"query": "hello"},
metadata={"user_id": "u123"},
)
inp.context["intent"] = "greet"Fields
| Field | Type | Default | Description |
|---|---|---|---|
data |
Any |
required | The primary payload passed to the chain's _execute / _aexecute method. Can be any Python object — string, dict, dataclass, etc. |
metadata |
Dict[str, Any] |
{} |
Arbitrary key-value pairs attached by the caller. Accumulated and merged across sequential steps. |
context |
Dict[str, Any] |
{} |
Mutable shared bag that persists across the entire pipeline. Can hold a plain dict or a ChainContext instance. |
timestamp |
float |
time.time() |
Unix epoch seconds at object creation. Auto-populated. |
Note: When you call BaseChain.run(data, key=value), the framework automatically wraps data into a ChainInput with metadata={"key": value}. You can also pass a pre-built ChainInput directly — it is used as-is without re-wrapping.
ChainOutput
ChainOutput is a typed dataclass that wraps the data flowing out of a chain, including execution metadata and success/error state.
from fennec_community.chain import ChainOutput
out = ChainOutput(
data={"result": "HELLO"},
success=True,
execution_time=0.003
)Fields
| Field | Type | Default | Description |
|---|---|---|---|
data |
Any |
required | The result produced by the chain. Passed as the data of the next ChainInput in a sequential pipeline. |
metadata |
Dict[str, Any] |
{} |
Enriched metadata from the chain. Merged into the accumulated metadata dict in SequentialChain. |
execution_time |
float |
0.0 |
Wall-clock duration in seconds, set by BaseChain._timed_execute. |
success |
bool |
True |
True if the chain completed without error. A False value causes SequentialChain to short-circuit and return immediately. |
error |
str | None |
None |
Human-readable error description. Populated when success=False. |
chain_id |
str | None |
None |
Set automatically by BaseChain._guarded_run to the executing chain's chain_id. |
ChainContext
ChainContext is a mutable, session-scoped shared whiteboard that every chain in a pipeline can read from and write to. It flows through the context field of ChainInput without being re-wrapped at each step, enabling rich cross-chain communication without tight coupling.
It supports both a flat default namespace (via __getitem__/__setitem__) and named sub-namespaces (via set(key, value, namespace="...")) to prevent key collisions between chains.
from fennec_community.chain import ChainContext
ctx = ChainContext(session_id="sess-001", user_role="admin")
ctx["intent"] = "purchase"
ctx.set("cart_total", 99.9, namespace="shop")
print(ctx["intent"]) # "purchase"
print(ctx.get("cart_total", namespace="shop")) # 99.9
print(ctx.get("missing", default=0)) # 0ChainContext.__init__
ChainContext(session_id: Optional[str] = None, **initial_values: Any)Constructs a new context with an optional session identifier and arbitrary initial key-value pairs.
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
session_id |
str | None |
None |
A human-readable or externally assigned session identifier. A random UUID is generated if not provided. Available as ctx.session_id. |
**initial_values |
Any |
— | Any keyword arguments are immediately stored in the default namespace. E.g. ChainContext(user="alice") sets ctx["user"] = "alice". |
ChainContext.get
get(key: str, default: Any = None, *, namespace: Optional[str] = None) -> AnyRetrieves a value from the context, optionally from a named sub-namespace.
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
key |
str |
required | The key to look up. |
default |
Any |
None |
Value returned if key is not found in the target namespace. |
namespace |
str | None |
None |
When provided, looks up the key in the named sub-namespace instead of the default store. If the namespace itself does not exist, default is returned. |
Returns
Any — The stored value, or default if not found.
Example
ctx.set("price", 49.99, namespace="checkout")
price = ctx.get("price", namespace="checkout") # 49.99
missing = ctx.get("price") # None — not in default nsChainContext.set
set(key: str, value: Any, *, namespace: Optional[str] = None) -> NoneStores a value in the context, optionally under a named sub-namespace. Using namespaces prevents key collisions between unrelated chains in the same pipeline.
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
key |
str |
required | The key to write. |
value |
Any |
required | The value to store. |
namespace |
str | None |
None |
When provided, writes into a separate dict keyed by this namespace name. Creates the namespace dict automatically if it does not exist. |
Returns
None
Example
ctx.set("token", "abc123", namespace="auth")
ctx.set("token", "xyz789", namespace="api")
# Both coexist without collision
print(ctx.get("token", namespace="auth")) # "abc123"
print(ctx.get("token", namespace="api")) # "xyz789"ChainContext.update
update(mapping: Dict[str, Any]) -> NoneBulk-updates the default namespace from a dictionary. Equivalent to calling ctx[k] = v for each key-value pair.
Parameters
| Parameter | Type | Description |
|---|---|---|
mapping |
Dict[str, Any] |
A dictionary whose key-value pairs are merged into the default namespace. Existing keys are overwritten. |
Returns
None
Example
ctx.update({"step": "payment", "amount": 120.0, "currency": "USD"})ChainContext.as_dict
as_dict() -> Dict[str, Any]Returns a shallow copy of the default namespace as a plain dict. Useful for serialisation, logging, or passing context state to external systems.
Parameters
None.
Returns
Dict[str, Any] — A new dict with all key-value pairs from the default namespace. Mutations to the returned dict do not affect the context.
Example
snapshot = ctx.as_dict()
print(json.dumps(snapshot)) # safe to serialiseTraceSpan
TraceSpan is a single node in the execution trace tree. Each chain execution creates one span; nested chains (e.g., steps inside SequentialChain) create child spans attached to the parent. Spans record timing, success/failure, input/output summaries, and arbitrary metadata.
Normally you do not construct TraceSpan directly — they are created automatically by ExecutionTracer.span(). However, you can read and inspect spans after execution.
Key Properties & Methods
| Member | Type | Description |
|---|---|---|
span_id |
str |
Short 8-character random ID for this span. |
chain_name |
str |
Name of the chain that owns this span. |
duration_ms |
float (property) |
Elapsed wall-clock time in milliseconds. Computed as (end_time - start_time) * 1000. Returns live value if the span is not yet finished. |
success |
bool |
True if the span finished without exception. |
error |
str | None |
Error message if success=False. |
input_summary |
str |
First 120 characters of the input data (string-coerced). |
output_summary |
str |
First 120 characters of the output data (string-coerced). |
children |
List[TraceSpan] |
Child spans created by nested chain executions. |
to_dict() |
Dict |
Serialises the span and all its children to a JSON-safe dict. |
pretty(indent) |
str |
Human-readable indented tree with ✅/❌ status and timing. |
TraceSpan.to_dict
to_dict() -> Dict[str, Any]Serialises the span and its full child tree to a JSON-safe dictionary. Ideal for shipping traces to external observability systems (Datadog, OpenTelemetry, custom dashboards).
Parameters
None.
Returns
Dict[str, Any] with keys: span_id, chain, duration_ms, success, error, input, output, metadata, and steps (list of child span dicts).
Example
import json
trace_dict = tracer.root.to_dict()
print(json.dumps(trace_dict, indent=2))TraceSpan.pretty
pretty(indent: int = 0) -> strReturns a human-readable, indented string representation of the span tree. Each line shows ✅ or ❌, the chain name, and elapsed milliseconds.
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
indent |
int |
0 |
Number of leading spaces for this level. Nested calls use indent + 4. |
Returns
str — Multi-line tree string.
Example
print(tracer.root.pretty())
# ✅ [SequentialChain] 12.3ms
# ✅ [strip] 0.1ms
# ✅ [upper] 0.2msExecutionTracer
ExecutionTracer manages the execution span tree during a chain run. It maintains a stack of active spans; each span() context manager pushes/pops the stack and automatically attaches new spans as children of the current active span.
from fennec_community.chain import ExecutionTracer
tracer = ExecutionTracer()
with tracer.span("MyChain") as span:
span.set_input("hello")
result = do_something()
span.set_output(result)
print(tracer.pretty())ExecutionTracer.span
@contextmanager
span(chain_name: str) -> Generator[TraceSpan, None, None]Context manager that opens a new TraceSpan, links it into the execution tree, yields it for annotation, and closes it on exit (marking success or failure automatically based on whether an exception was raised).
Parameters
| Parameter | Type | Description |
|---|---|---|
chain_name |
str |
The name label for this span, typically the chain's .name attribute. |
Returns
TraceSpan — Yielded inside the with block. Call span.set_input(data) and span.set_output(result) to annotate it.
Tree construction logic:
- If the stack is empty, the new span becomes
tracer.root. - If the stack is non-empty, the new span is attached as a child of the current top-of-stack span.
- On exception inside the block, the span is marked
success=Falseand the exception is re-raised.
Example
tracer = ExecutionTracer()
with tracer.span("Pipeline") as root_span:
root_span.set_input("raw input")
with tracer.span("Step1") as child_span:
child_span.set_output("step1 result")
root_span.set_output("final result")
print(tracer.pretty())
# ✅ [Pipeline] 5.2ms
# ✅ [Step1] 1.1msExecutionTracer.to_dict
to_dict() -> Dict[str, Any]Returns the entire execution tree rooted at tracer.root as a JSON-safe dict. Returns an empty dict {} if no spans have been recorded yet.
Parameters
None.
Returns
Dict[str, Any] — The full tree as returned by TraceSpan.to_dict().
Example
import json
print(json.dumps(tracer.to_dict(), indent=2))ExecutionTracer.pretty
pretty() -> strReturns the entire execution tree as a human-readable multi-line string. Returns "(no trace)" if no spans have been recorded.
Parameters
None.
Returns
str — Indented execution tree with ✅/❌ status and timing for each chain.
Example
print(tracer.pretty())
# ✅ [SequentialChain] 14.7ms
# ✅ [strip_whitespace] 0.2ms
# ✅ [to_upper] 0.1ms
# ✅ [ParallelChain] 8.3ms
# ✅ [summary_chain] 7.1ms
# ✅ [sentiment_chain] 6.8msBaseChain
BaseChain is the abstract root class for all chains. It implements the full execution lifecycle — wrapping, retries, timeouts, tracing, and metrics — in _guarded_run(). Subclasses only implement the async _aexecute() hook (or the synchronous _execute() hook for purely synchronous work).
You extend BaseChain to create custom chain types:
from fennec_community.chain import BaseChain, ChainInput, ChainOutput, ChainConfig
class MyLLMChain(BaseChain):
def __init__(self, llm, **kwargs):
super().__init__(name="LLMChain", config=ChainConfig(retries=2, timeout=30.0), **kwargs)
self.llm = llm
async def _aexecute(self, inp: ChainInput) -> ChainOutput:
answer = await self.llm.agenerate(inp.data)
return ChainOutput(data=answer, success=True)BaseChain.__init__
BaseChain(
name: Optional[str] = None,
config: Optional[ChainConfig] = None,
tracer: Optional[ExecutionTracer] = None
)Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str | None |
None |
Human-readable label for this chain instance. Used in log messages and trace spans. Defaults to the class name if not provided. |
config |
ChainConfig | None |
None |
Per-chain resilience settings. A default ChainConfig() (no retries, no timeout) is created if not provided. |
tracer |
ExecutionTracer | None |
None |
Shared execution tracer. A new ExecutionTracer() is created per instance if not provided. Pass a shared tracer to all chains in a pipeline for a unified trace tree. |
Auto-assigned attributes:
chain_id—"{name}-{uuid[:8]}"— a unique identifier for this chain instance._metrics—defaultdict(int)counting"success"and"error"executions._history— list of execution records (input, output, timing, success/error).
BaseChain.run
run(input_data: Any, **kwargs: Any) -> AnySynchronous public entry point. Wraps input_data into a ChainInput, delegates to _guarded_run (via the event loop), and returns the raw data from the resulting ChainOutput.
This method is the synchronous bridge for environments that cannot use await. In async contexts, prefer arun() to avoid event-loop conflicts.
Parameters
| Parameter | Type | Description |
|---|---|---|
input_data |
Any |
The data to process. If already a ChainInput, used as-is. Otherwise wrapped automatically. |
**kwargs |
Any |
Extra keyword arguments stored in ChainInput.metadata. |
Returns
Any — The data field of the successful ChainOutput. Not the ChainOutput object itself.
Raises
- Re-raises the last caught exception if all retries are exhausted and no
fallback_chainis configured.
Example
result = my_chain.run("hello world", source="api", user_id="u42")
print(result) # processed outputBaseChain.arun
async arun(input_data: Any, **kwargs: Any) -> AnyAsync public entry point. Functionally identical to run() but uses await throughout, making it safe for use in async def functions, FastAPI handlers, and async pipelines.
Preferred over run() in all async contexts.
Parameters
| Parameter | Type | Description |
|---|---|---|
input_data |
Any |
The data to process. If already a ChainInput, used as-is. |
**kwargs |
Any |
Extra keyword arguments stored in ChainInput.metadata. |
Returns
Any — The data field of the successful ChainOutput.
Raises
- Re-raises the last caught exception if all retries are exhausted and no
fallback_chainis configured.
Example
# In an async function
result = await my_chain.arun("hello world")
# As the entry point of a script
import asyncio
result = asyncio.run(my_chain.arun("hello world"))BaseChain.__call__
__call__(input_data: Any, **kwargs: Any) -> AnyMakes any chain callable as a function. Internally delegates to run().
Parameters
Same as run().
Returns
Same as run().
Example
# These three are equivalent:
result = my_chain.run("data")
result = my_chain("data")
result = asyncio.run(my_chain.arun("data"))BaseChain.__rshift__ (>>)
__rshift__(other: BaseChain) -> SequentialChainComposes two chains into a SequentialChain using the >> operator. If the left operand is already a SequentialChain, the new chain is appended to its flat list (no nesting). This means a >> b >> c always produces SequentialChain([a, b, c]), never SequentialChain([SequentialChain([a, b]), c]).
Parameters
| Parameter | Type | Description |
|---|---|---|
other |
BaseChain |
The chain to execute after self. |
Returns
SequentialChain — A new sequential chain containing self followed by other.
Example
step1 = TransformChain(str.strip)
step2 = TransformChain(str.upper)
step3 = TransformChain(str.split)
pipeline = step1 >> step2 >> step3
# equivalent to SequentialChain([step1, step2, step3])
result = asyncio.run(pipeline.arun(" hello world "))
# ["HELLO", "WORLD"]BaseChain.__or__ (|)
__or__(other: BaseChain) -> SequentialChainAlias for >>. Enables pipeline composition with the | operator. Identical behaviour to __rshift__.
Example
pipeline = step1 | step2 | step3BaseChain.get_stats
get_stats() -> Dict[str, Any]Returns a snapshot of execution statistics for this chain instance. Counts are accumulated across all run() / arun() calls since the chain was created.
Parameters
None.
Returns
Dict[str, Any] with the following keys:
| Key | Type | Description |
|---|---|---|
chain_id |
str |
Unique identifier of this chain instance. |
name |
str |
Human-readable name. |
total_executions |
int |
Total number of run() / arun() calls. |
success_count |
int |
Number of calls that completed without error. |
error_count |
int |
Number of calls that exhausted all retries. |
success_rate |
float |
success_count / total_executions. 0.0 if no executions. |
avg_execution_time_ms |
float |
Average wall-clock execution time in milliseconds across all successful runs. |
Example
stats = my_chain.get_stats()
print(stats)
# {
# 'chain_id': 'MyChain-a1b2c3d4',
# 'name': 'MyChain',
# 'total_executions': 100,
# 'success_count': 98,
# 'error_count': 2,
# 'success_rate': 0.98,
# 'avg_execution_time_ms': 42.7
# }BaseChain.get_trace
get_trace() -> Dict[str, Any]Returns the most recent execution trace as a JSON-safe dictionary. The trace includes all nested child spans from the last run() or arun() call.
Parameters
None.
Returns
Dict[str, Any] — The root TraceSpan.to_dict() output. Returns {} if the chain has never been run or the tracer has no root span.
Example
import json
await my_pipeline.arun("test input")
trace = my_pipeline.get_trace()
print(json.dumps(trace, indent=2))
# {
# "span_id": "a1b2c3d4",
# "chain": "SequentialChain",
# "duration_ms": 12.3,
# "success": true,
# "steps": [
# {"span_id": "...", "chain": "strip", ...},
# {"span_id": "...", "chain": "upper", ...}
# ]
# }Module: Chains
SequentialChain
SequentialChain executes a list of chains one after another, piping the output data of each step as the input data of the next. Metadata is accumulated and merged across steps. If any step returns success=False, the pipeline short-circuits and returns immediately with that error.
from fennec_community.chain import SequentialChain, TransformChain
import asyncio
pipeline = SequentialChain([
TransformChain(str.strip),
TransformChain(str.upper),
TransformChain(str.split),
], name="text_pipeline")
result = asyncio.run(pipeline.arun(" hello world "))
print(result) # ["HELLO", "WORLD"]SequentialChain.__init__
SequentialChain(
chains: List[BaseChain],
name: str = "SequentialChain",
config: Optional[ChainConfig] = None,
tracer: Optional[ExecutionTracer] = None
)Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
chains |
List[BaseChain] |
required | Ordered list of chains to execute. Execution order follows the list order exactly. |
name |
str |
"SequentialChain" |
Human-readable label for this pipeline. |
config |
ChainConfig | None |
None |
Resilience settings (retries, timeout, fallback) applied to the sequential pipeline as a whole — not to individual steps. Each step uses its own config. |
tracer |
ExecutionTracer | None |
None |
Shared tracer. When set, propagated to every sub-chain so child spans appear in the same tree. |
SequentialChain.add
add(chain: BaseChain) -> "SequentialChain"Appends a chain to the end of the step list and returns self, enabling a fluent builder pattern.
Parameters
| Parameter | Type | Description |
|---|---|---|
chain |
BaseChain |
The chain to append. Executed after all currently registered steps. |
Returns
SequentialChain — Returns self for method chaining.
Example
pipeline = SequentialChain([])
pipeline.add(step1).add(step2).add(step3)
# Equivalent to:
pipeline = SequentialChain([step1, step2, step3])ParallelChain
ParallelChain executes all branches concurrently using asyncio.gather, feeding each branch the same input. Results are aggregated into a dict (named branches) or list (indexed branches). Highly effective for fan-out patterns where the same input needs to be processed by multiple independent chains.
from fennec_community.chain import ParallelChain, TransformChain
import asyncio
parallel = ParallelChain({
"original": TransformChain(lambda x: x),
"upper": TransformChain(str.upper),
"words": TransformChain(str.split),
})
result = asyncio.run(parallel.arun("hello world"))
print(result)
# {
# "original": "hello world",
# "upper": "HELLO WORLD",
# "words": ["hello", "world"]
# }ParallelChain.__init__
ParallelChain(
chains: Union[Dict[str, BaseChain], List[BaseChain]],
name: str = "ParallelChain",
fail_fast: bool = True,
config: Optional[ChainConfig] = None,
tracer: Optional[ExecutionTracer] = None
)Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
chains |
Dict[str, BaseChain] | List[BaseChain] |
required | Either a dict mapping branch labels to chains (result is a dict), or a list of chains (result is a list in the same order). |
name |
str |
"ParallelChain" |
Human-readable label. |
fail_fast |
bool |
True |
When True: If any branch raises an exception, the entire asyncio.gather is cancelled and the exception propagates. When False: All branches run to completion regardless of failures; results include None for failed branches, and output.metadata["branch_errors"] contains the error messages. The overall output is success=True if at least one branch succeeded. |
config |
ChainConfig | None |
None |
Resilience settings for the parallel container chain. |
tracer |
ExecutionTracer | None |
None |
Shared tracer propagated to all branches. |
Example — fail_fast=False (partial results):
parallel = ParallelChain(
{"fast": fast_chain, "slow": potentially_failing_chain},
fail_fast=False
)
result = await parallel.arun("data")
# result.data = {"fast": "ok", "slow": None}
# result.metadata["branch_errors"] = {"slow": "timeout"}ConditionalChain
ConditionalChain evaluates a condition function at runtime to select and execute one of several named branches. This enables dynamic routing: the same pipeline can follow different paths based on the actual input data.
from fennec_community.chain import ConditionalChain, TransformChain
import asyncio
def router(data: dict) -> str:
return "high" if data.get("score", 0) > 0.7 else "low"
chain = ConditionalChain(
condition=router,
branches={
"high": TransformChain(lambda d: f"HIGH PRIORITY: {d}"),
"low": TransformChain(lambda d: f"Normal: {d}"),
},
default=TransformChain(lambda d: f"Unknown: {d}")
)
result = asyncio.run(chain.arun({"score": 0.9, "text": "urgent"}))
print(result) # "HIGH PRIORITY: {'score': 0.9, 'text': 'urgent'}"ConditionalChain.__init__
ConditionalChain(
condition: Callable[[Any], str],
branches: Dict[str, BaseChain],
default: Optional[BaseChain] = None,
name: str = "ConditionalChain",
config: Optional[ChainConfig] = None,
tracer: Optional[ExecutionTracer] = None
)Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
condition |
Callable[[Any], str] |
required | A callable that receives input_data.data and returns a str key. The key is looked up in branches. If the condition raises an exception, a failed ChainOutput is returned immediately. |
branches |
Dict[str, BaseChain] |
required | Dictionary mapping string route keys to chains. Only one branch is executed per call — the one whose key matches the condition's return value. |
default |
BaseChain | None |
None |
Fallback chain executed when the condition returns a key not present in branches. If None and the key is missing, ChainOutput(success=False) is returned with a descriptive error. |
name |
str |
"ConditionalChain" |
Human-readable label. |
config |
ChainConfig | None |
None |
Resilience settings for the router itself (not individual branches). |
tracer |
ExecutionTracer | None |
None |
Shared tracer. Propagated to the selected branch. |
Routing metadata: The selected route key is stored in output.metadata["selected_route"] for auditability.
TransformChain
TransformChain is the simplest concrete chain: it wraps any synchronous or asynchronous callable and applies it to input_data.data. Use it to integrate any pure function into a pipeline without boilerplate.
from fennec_community.chain import TransformChain
import json, asyncio
parse_json = TransformChain(json.loads, name="json_parser")
to_upper = TransformChain(str.upper, name="to_upper")
word_count = TransformChain(lambda s: len(s.split()), name="word_count")
pipeline = parse_json >> TransformChain(lambda d: d["text"]) >> to_upper
result = asyncio.run(pipeline.arun('{"text": "hello world"}'))
print(result) # "HELLO WORLD"TransformChain.__init__
TransformChain(
transform_fn: Callable[[Any], Any],
name: Optional[str] = None,
config: Optional[ChainConfig] = None,
tracer: Optional[ExecutionTracer] = None
)Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
transform_fn |
Callable[[Any], Any] |
required | The transformation function. Receives one positional argument (input_data.data) and must return the transformed value. Can be sync or async (async def). Coroutine functions are detected and awaited automatically. |
name |
str | None |
None |
Human-readable label. Defaults to transform_fn.__name__ if available, otherwise "TransformChain". |
config |
ChainConfig | None |
None |
Resilience settings. |
tracer |
ExecutionTracer | None |
None |
Shared tracer. |
Error handling: Any exception raised by transform_fn is caught and returned as ChainOutput(success=False, error="TransformChain '{name}' raised: {exc}") — it does not propagate until BaseChain._guarded_run decides whether to retry.
Module: Registry
ChainRegistry
ChainRegistry is a global class-level registry (not an instance — all methods are @classmethod) that maps string aliases to BaseChain subclasses. It enables plugin-style dynamic chain lookup and instantiation without importing specific classes. The four built-in chain types are pre-registered at import time.
Pre-registered aliases:
| Alias | Class |
|---|---|
"sequential" |
SequentialChain |
"parallel" |
ParallelChain |
"conditional" |
ConditionalChain |
"transform" |
TransformChain |
from fennec_community.chain import ChainRegistry
# Look up and instantiate a registered chain
cls = ChainRegistry.get("transform")
chain = cls(str.upper)
# Or in one call
chain = ChainRegistry.build("transform", str.upper)ChainRegistry.register
@classmethod
register(alias: str, chain_cls: Type[BaseChain]) -> NoneRegisters a BaseChain subclass under a string alias. If the alias already exists, a warning is logged and the previous registration is overwritten.
Parameters
| Parameter | Type | Description |
|---|---|---|
alias |
str |
Unique string key to associate with the class. Convention: lowercase with underscores (e.g., "my_llm_chain"). |
chain_cls |
Type[BaseChain] |
The class object (not an instance) to register. Must be a subclass of BaseChain. |
Returns
None
Raises
TypeError— ifchain_clsis not a class or is not a subclass ofBaseChain.
Example
class MyLLMChain(BaseChain):
...
ChainRegistry.register("llm", MyLLMChain)ChainRegistry.chain
@classmethod
chain(alias: str) -> Callable[[Type[BaseChain]], Type[BaseChain]]A class decorator that registers the decorated class under alias at definition time. The class is returned unchanged — decoration is transparent to the class itself.
Parameters
| Parameter | Type | Description |
|---|---|---|
alias |
str |
The registry key under which the class will be registered. |
Returns
A decorator that accepts and returns a Type[BaseChain].
Example
@ChainRegistry.chain("sentiment")
class SentimentChain(BaseChain):
async def _aexecute(self, inp):
score = my_model.predict(inp.data)
return ChainOutput(data=score, success=True)
# Now accessible:
chain = ChainRegistry.build("sentiment")ChainRegistry.get
@classmethod
get(alias: str) -> Type[BaseChain]Returns the class registered under alias. Does not instantiate it.
Parameters
| Parameter | Type | Description |
|---|---|---|
alias |
str |
The registered alias to look up. |
Returns
Type[BaseChain] — The registered class object.
Raises
KeyError— with a message listing all available aliases ifaliasis not registered.
Example
TransformCls = ChainRegistry.get("transform")
chain = TransformCls(str.upper, name="my_upper")ChainRegistry.build
@classmethod
build(alias: str, *args: Any, **kwargs: Any) -> BaseChainLooks up the class registered under alias and instantiates it with the provided arguments. Combines get() and class construction in one call.
Parameters
| Parameter | Type | Description |
|---|---|---|
alias |
str |
The registered alias to look up and instantiate. |
*args |
Any |
Positional arguments forwarded to the class constructor. |
**kwargs |
Any |
Keyword arguments forwarded to the class constructor. |
Returns
BaseChain — A new instance of the registered class.
Raises
KeyError— ifaliasis not registered (same asget()).
Example
transform_chain = ChainRegistry.build("transform", str.strip, name="strip_ws")
seq = ChainRegistry.build("sequential", [step1, step2], name="my_pipeline")ChainRegistry.list_all
@classmethod
list_all() -> Dict[str, str]Returns a dictionary mapping every registered alias to its corresponding class name. Useful for introspection, debugging, and building dynamic UI menus.
Parameters
None.
Returns
Dict[str, str] — {alias: class_name} for all registered entries.
Example
print(ChainRegistry.list_all())
# {
# 'sequential': 'SequentialChain',
# 'parallel': 'ParallelChain',
# 'conditional': 'ConditionalChain',
# 'transform': 'TransformChain',
# 'sentiment': 'SentimentChain', ← custom registration
# }ChainRegistry.unregister
@classmethod
unregister(alias: str) -> NoneRemoves the alias from the registry. Silently does nothing if the alias is not present. Primarily intended for testing to clean up custom registrations.
Parameters
| Parameter | Type | Description |
|---|---|---|
alias |
str |
The alias to remove. |
Returns
None
ChainRegistry.clear
@classmethod
clear() -> NoneWipes all entries from the registry, including the built-in chains. After clear(), all aliases — including "sequential", "parallel", "conditional", and "transform" — are gone. Primarily intended for test isolation.
Parameters
None.
Returns
None
Warning: Calling clear() in production code removes the built-in chain registrations. Use unregister() for targeted removals.
Module: Utils
cached
cached(fn: Callable) -> CallableA decorator that adds in-memory memoisation to any synchronous callable. The cache key is a SHA-256 hash of the JSON-serialised arguments (args + kwargs). Unhashable or non-serialisable arguments fall back to a direct (non-cached) call rather than raising an error.
from fennec_community.chain import cached
@cached
def expensive_lookup(query: str) -> str:
return slow_database.fetch(query)
result1 = expensive_lookup("hello") # executes the function
result2 = expensive_lookup("hello") # returns from cache ⚡Parameters
| Parameter | Type | Description |
|---|---|---|
fn |
Callable |
Any synchronous callable to memoize. The original function's __name__, __doc__, and other attributes are preserved via functools.wraps. |
Returns
Callable — The wrapped, cache-enabled function. The wrapper additionally exposes a cache_clear() method to wipe the cache store.
Cache invalidation:
expensive_lookup.cache_clear() # wipes all cached entries for this functionNote: The cache is per-function-instance and lives in the closure. It is not shared across calls in different modules or processes. It is also not thread-safe for concurrent writes. For thread-safe or distributed caching, use CachingChain with an external backend.
CachingChain
CachingChain wraps any BaseChain instance and adds an in-memory result cache keyed by a SHA-256 hash of the serialised input data. Cache hits bypass the wrapped chain entirely. Ideal for expensive operations like LLM calls that may receive the same input repeatedly.
from fennec_community.chain import CachingChain
slow_chain = MyExpensiveLLMChain()
cached_chain = CachingChain(slow_chain)
result1 = await cached_chain.arun("What is AI?") # executes slow_chain
result2 = await cached_chain.arun("What is AI?") # returns from cache ⚡
print(cached_chain.cache_stats)
# {"hits": 1, "misses": 1, "size": 1}CachingChain.__init__
CachingChain(
chain: BaseChain,
name: Optional[str] = None,
config: Optional[ChainConfig] = None,
tracer: Optional[ExecutionTracer] = None
)Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
chain |
BaseChain |
required | The chain whose results should be cached. |
name |
str | None |
None |
Human-readable label. Defaults to "Cached({chain.name})". |
config |
ChainConfig | None |
None |
Resilience settings for the caching wrapper itself. |
tracer |
ExecutionTracer | None |
None |
Shared tracer. Propagated to the inner chain on cache misses. |
CachingChain.cache_clear
cache_clear() -> NoneClears all cached entries and resets the hit_count and miss_count counters to zero. Use this to force re-execution of the inner chain (e.g., after the underlying data changes).
Parameters
None.
Returns
None
Example
cached_chain.cache_clear()
# All subsequent calls will miss the cache and execute the inner chainCachingChain.cache_stats (property)
@property
cache_stats -> Dict[str, int]Returns a snapshot of cache performance counters.
Returns
Dict[str, int] with keys:
| Key | Description |
|---|---|
hits |
Number of times a result was served from cache. |
misses |
Number of times the inner chain was executed (cache miss). |
size |
Current number of entries in the cache store. |
Example
stats = cached_chain.cache_stats
print(f"Hit rate: {stats['hits'] / (stats['hits'] + stats['misses']):.1%}")DeclarativeChainBuilder
DeclarativeChainBuilder enables building complete chain pipelines from plain Python dicts, JSON strings, or YAML strings — without writing any Python class definitions. This is particularly useful for configuration-driven architectures, no-code tooling, or dynamic pipeline construction from external config files.
All methods are @staticmethod — no instance is needed.
from fennec_community.chain import DeclarativeChainBuilder
import asyncio
spec = {
"type": "sequential",
"name": "text_pipeline",
"steps": [
{"type": "transform", "fn_name": "strip"},
{"type": "transform", "fn_name": "upper"},
]
}
pipeline = DeclarativeChainBuilder.from_dict(
spec,
fn_registry={"strip": str.strip, "upper": str.upper}
)
result = asyncio.run(pipeline.arun(" hello world "))
print(result) # "HELLO WORLD"DeclarativeChainBuilder.from_dict
@staticmethod
from_dict(
spec: Dict[str, Any],
fn_registry: Optional[Dict[str, Callable]] = None,
chain_registry: Optional[Dict[str, Type[BaseChain]]] = None
) -> BaseChainBuilds a chain from a plain Python dictionary specification. The method is recursive — nested steps and branches dicts are resolved by calling from_dict on each nested spec.
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
spec |
Dict[str, Any] |
required | The chain specification dictionary. Must contain a "type" key. See spec format below. |
fn_registry |
Dict[str, Callable] | None |
None |
Maps fn_name strings in transform specs to actual callable objects. Required when using TransformChain steps with named functions. |
chain_registry |
Dict[str, Type[BaseChain]] | None |
None |
Maps custom type strings to chain classes, extending the global ChainRegistry. |
Returns
BaseChain — The constructed chain or pipeline ready for execution.
Raises
ValueError— iftypeis unknown, or if atransformstep references afn_namenot infn_registry.
Spec format reference:
# Sequential pipeline
{
"type": "sequential",
"name": "my_pipeline", # optional
"steps": [ # list of nested specs
{"type": "transform", "fn_name": "strip"},
{"type": "transform", "fn_name": "upper"},
]
}
# Parallel fan-out (named branches → dict result)
{
"type": "parallel",
"branches": {
"summary": {"type": "transform", "fn_name": "summarize"},
"sentiment": {"type": "transform", "fn_name": "analyze"},
}
}
# Parallel fan-out (list branches → list result)
{
"type": "parallel",
"branches": [
{"type": "transform", "fn_name": "fn_a"},
{"type": "transform", "fn_name": "fn_b"},
]
}
# Transform step — callable by value
{"type": "transform", "fn": my_callable_object}
# Transform step — callable by name from fn_registry
{"type": "transform", "fn_name": "my_fn", "name": "optional_label"}
# Any alias registered in ChainRegistry
{"type": "sentiment", "name": "sentiment_step"}Example
spec = {
"type": "sequential",
"steps": [
{"type": "transform", "fn_name": "strip"},
{
"type": "parallel",
"branches": {
"upper": {"type": "transform", "fn_name": "upper"},
"length": {"type": "transform", "fn_name": "length"},
}
}
]
}
pipeline = DeclarativeChainBuilder.from_dict(spec, fn_registry={
"strip": str.strip,
"upper": str.upper,
"length": len,
})
result = asyncio.run(pipeline.arun(" hello "))
print(result) # {"upper": "HELLO", "length": 5}DeclarativeChainBuilder.from_json
@staticmethod
from_json(
json_str: str,
fn_registry: Optional[Dict[str, Callable]] = None,
chain_registry: Optional[Dict[str, Type[BaseChain]]] = None
) -> BaseChainParses a JSON string into a spec dictionary, then delegates to from_dict. Use for configuration stored in JSON files, API payloads, or environment variables.
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
json_str |
str |
required | A valid JSON string whose top-level object is a chain spec dictionary. |
fn_registry |
Dict[str, Callable] | None |
None |
Function registry forwarded to from_dict. |
chain_registry |
Dict[str, Type[BaseChain]] | None |
None |
Chain class registry forwarded to from_dict. |
Returns
BaseChain — The constructed pipeline.
Raises
json.JSONDecodeError— ifjson_stris not valid JSON.ValueError— for unknown chain types or missing function references (same asfrom_dict).
Example
json_spec = '''
{
"type": "sequential",
"steps": [
{"type": "transform", "fn_name": "strip"},
{"type": "transform", "fn_name": "upper"}
]
}
'''
pipeline = DeclarativeChainBuilder.from_json(
json_spec,
fn_registry={"strip": str.strip, "upper": str.upper}
)
result = asyncio.run(pipeline.arun(" hello "))
print(result) # "HELLO"DeclarativeChainBuilder.from_yaml
@staticmethod
from_yaml(
yaml_str: str,
fn_registry: Optional[Dict[str, Callable]] = None,
chain_registry: Optional[Dict[str, Type[BaseChain]]] = None
) -> BaseChainParses a YAML string into a spec dictionary, then delegates to from_dict. Requires pyyaml to be installed. Use for human-authored configuration files.
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
yaml_str |
str |
required | A valid YAML string whose top-level mapping is a chain spec. |
fn_registry |
Dict[str, Callable] | None |
None |
Function registry forwarded to from_dict. |
chain_registry |
Dict[str, Type[BaseChain]] | None |
None |
Chain class registry forwarded to from_dict. |
Returns
BaseChain — The constructed pipeline.
Raises
ImportError— ifpyyamlis not installed. Message:"pyyaml is required for YAML support: pip install pyyaml".ValueError— for unknown chain types or missing function references.
Example
yaml_spec = """
type: sequential
name: text_pipeline
steps:
- type: transform
fn_name: strip
- type: parallel
branches:
upper:
type: transform
fn_name: upper
words:
type: transform
fn_name: split
"""
pipeline = DeclarativeChainBuilder.from_yaml(
yaml_spec,
fn_registry={
"strip": str.strip,
"upper": str.upper,
"split": str.split,
}
)
result = asyncio.run(pipeline.arun(" hello world "))
print(result) # {"upper": "HELLO WORLD", "words": ["hello", "world"]}Operator Reference
| Operator | Method | Behaviour |
|---|---|---|
a >> b |
__rshift__ |
Sequential composition: run a, then b. Returns SequentialChain([a, b]). |
a | b |
__or__ |
Alias for >>. Identical behaviour. |
chain(data) |
__call__ |
Calls chain.run(data). Makes any chain callable as a function. |
a >> b >> c |
Flattened >> |
Produces SequentialChain([a, b, c]) — never nested SequentialChain. |
Error Handling & Resilience
Retry with back-off
from chain import ChainConfig
# 3 retries, 1-second delay between attempts
cfg = ChainConfig(retries=3, retry_delay=1.0)
chain = MyChain(config=cfg)Timeout
cfg = ChainConfig(timeout=5.0) # fail after 5 seconds
chain = MyChain(config=cfg)When a timeout fires, the chain returns ChainOutput(success=False, error="Timeout after 5.01s (limit 5.0s)").
Fallback chain
from fennec_community.chain import ChainConfig, TransformChain
fallback = TransformChain(lambda _: "default answer", name="fallback")
cfg = ChainConfig(retries=2, fallback_chain=fallback)
primary = MyUnreliableChain(config=cfg)
# If primary fails 3 times, fallback chain is called with the original input
result = await primary.arun("query")Partial failures in ParallelChain
parallel = ParallelChain(
{"branch_a": chain_a, "branch_b": chain_b},
fail_fast=False # collect all results, don't abort on first failure
)
output = await parallel.arun("data")
if output.metadata.get("branch_errors"):
print("Some branches failed:", output.metadata["branch_errors"])Full Integration Examples
Example 1 — Custom LLM Chain with Retries, Tracing, and Stats
import asyncio
from fennec_community.chain import BaseChain, ChainInput, ChainOutput, ChainConfig, ExecutionTracer
class LLMChain(BaseChain):
def __init__(self, llm, **kwargs):
super().__init__(
name="LLMChain",
config=ChainConfig(retries=2, timeout=30.0, verbose=True),
**kwargs
)
self.llm = llm
async def _aexecute(self, inp: ChainInput) -> ChainOutput:
answer = await self.llm.agenerate(inp.data)
return ChainOutput(data=answer, success=True)
llm_chain = LLMChain(my_llm)
result = asyncio.run(llm_chain.arun("What is machine learning?"))
print(result)
# Observability
print(llm_chain.get_stats())
import json
print(json.dumps(llm_chain.get_trace(), indent=2))Example 2 — Multi-Stage Pipeline with Context
import asyncio
from fennec_community.chain import TransformChain, SequentialChain, ChainInput, ChainContext
# Build context
ctx = ChainContext(session_id="session-001")
ctx["user_tier"] = "premium"
# Build pipeline
pipeline = (
TransformChain(str.strip, name="strip")
>> TransformChain(str.lower, name="lowercase")
>> TransformChain(str.split, name="tokenize")
)
# Pass context via ChainInput
inp = ChainInput(data=" Hello World ", context=ctx)
result = asyncio.run(pipeline.arun(inp))
print(result) # ["hello", "world"]
# Shared context is available to all steps
print(ctx["user_tier"]) # "premium" — still accessibleExample 3 — Parallel Fan-Out then Sequential Merge
import asyncio
from fennec_community.chain import TransformChain, ParallelChain, SequentialChain
# Stage 1: parallel enrichment
enrich = ParallelChain({
"upper": TransformChain(str.upper),
"words": TransformChain(str.split),
"length": TransformChain(len),
})
# Stage 2: merge results
def merge(d): return f"TEXT: {d['upper']} | WORDS: {d['words']} | LEN: {d['length']}"
merge_chain = TransformChain(merge, name="merge")
pipeline = enrich >> merge_chain
result = asyncio.run(pipeline.arun("hello world"))
print(result)
# "TEXT: HELLO WORLD | WORDS: ['hello', 'world'] | LEN: 11"Example 4 — Conditional Routing Pipeline
import asyncio
from fennec_community.chain import ConditionalChain, TransformChain, SequentialChain
def classify(data: str) -> str:
return "question" if data.strip().endswith("?") else "statement"
question_chain = TransformChain(lambda t: f"[Q] {t}", name="question_handler")
statement_chain = TransformChain(lambda t: f"[S] {t}", name="statement_handler")
router = ConditionalChain(
condition=classify,
branches={"question": question_chain, "statement": statement_chain},
default=TransformChain(lambda t: f"[?] {t}", name="unknown_handler")
)
pipeline = TransformChain(str.strip) >> router
print(asyncio.run(pipeline.arun("What is AI? "))) # [Q] What is AI?
print(asyncio.run(pipeline.arun("AI is powerful."))) # [S] AI is powerful.Example 5 — Caching Chain for Expensive Calls
import asyncio
from fennec_community.chain import CachingChain
expensive_chain = MyLLMChain()
cached = CachingChain(expensive_chain)
q = "Explain quantum computing"
r1 = asyncio.run(cached.arun(q)) # executes LLM
r2 = asyncio.run(cached.arun(q)) # cache hit ⚡
r3 = asyncio.run(cached.arun(q)) # cache hit ⚡
print(cached.cache_stats) # {"hits": 2, "misses": 1, "size": 1}
cached.cache_clear() # force refresh
r4 = asyncio.run(cached.arun(q)) # executes LLM againExample 6 — Registry + Declarative Builder
import asyncio
from fennec_community.chain import ChainRegistry, DeclarativeChainBuilder, BaseChain, ChainInput, ChainOutput
# Register a custom chain
@ChainRegistry.chain("word_count")
class WordCountChain(BaseChain):
async def _aexecute(self, inp: ChainInput) -> ChainOutput:
return ChainOutput(data=len(inp.data.split()), success=True)
# Build from YAML config (loaded from file / env / API)
yaml_config = """
type: sequential
steps:
- type: transform
fn_name: strip
- type: word_count
"""
pipeline = DeclarativeChainBuilder.from_yaml(
yaml_config,
fn_registry={"strip": str.strip}
)
result = asyncio.run(pipeline.arun(" hello beautiful world "))
print(result) # 3
print(ChainRegistry.list_all())Example 7 — FastAPI Async Service
from fastapi import FastAPI
from fennec_community.chain import SequentialChain, TransformChain, CachingChain, ChainConfig
import asyncio
app = FastAPI()
pipeline = CachingChain(
SequentialChain([
TransformChain(str.strip, name="strip"),
TransformChain(str.upper, name="upper"),
], config=ChainConfig(retries=2, timeout=5.0)),
)
@app.post("/process")
async def process(text: str):
result = await pipeline.arun(text)
return {
"result": result,
"cache_stats": pipeline.cache_stats,
"chain_stats": pipeline.get_stats(),
}
@app.get("/trace")
async def get_trace():
return pipeline.get_trace()community/chain.md