Fennec Logo Fennec
Fennec Community community/chain.md

Chain Modular


Table of Contents

  1. Overview
  2. Architecture & Design Philosophy
  3. Quick Start
  4. Module: Core
  5. Module: Chains
  6. Module: Registry
  7. Module: Utils
  8. Operator Reference
  9. Error Handling & Resilience
  10. Full Integration Examples

Overview

chain is a production-grade, modular pipeline for composing arbitrary processing steps — LLM calls, data transforms, I/O operations, and more — into structured, observable, and resilient execution graphs.

Core capabilities at a glance:

Feature Description
Sequential pipelines SequentialChain or a >> b >> c operator
Parallel fan-out ParallelChain with named or indexed branches
Conditional routing ConditionalChain with runtime key-based branching
Pure transforms TransformChain wraps any sync/async callable
Retries & fallbacks ChainConfig — per-chain retry count, delay, and fallback chain
Timeouts Per-chain execution timeout via ChainConfig.timeout
Observability Structured execution tree via ExecutionTracer / TraceSpan
Shared state ChainContext mutable whiteboard shared across pipeline steps
Caching CachingChain and cached decorator for result memoisation
Dynamic registration ChainRegistry for plugin-style class lookup and instantiation
Declarative config DeclarativeChainBuilder builds pipelines from dicts, JSON, or YAML

Architecture & Design Philosophy

┌─────────────────────────────────────────────────────────────────────┐
│                          chain framework                            │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  core/                                                       │   │
│  │  BaseChain ← SequentialChain, ParallelChain,                │   │
│  │              ConditionalChain, TransformChain,               │   │
│  │              CachingChain                                   │   │
│  │                                                              │   │
│  │  ChainConfig  ChainContext  ChainInput  ChainOutput          │   │
│  │  ExecutionTracer  TraceSpan                                  │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  ┌──────────────────┐  ┌──────────────────┐  ┌─────────────────┐  │
│  │  chains/         │  │  registry/       │  │  utils/         │  │
│  │  SequentialChain │  │  ChainRegistry   │  │  cached         │  │
│  │  ParallelChain   │  │  register()      │  │  CachingChain   │  │
│  │  ConditionalChain│  │  get() / build() │  │  Declarative    │  │
│  │  TransformChain  │  │  @chain decorator│  │  ChainBuilder   │  │
│  └──────────────────┘  └──────────────────┘  └─────────────────┘  │
└─────────────────────────────────────────────────────────────────────┘

Key design decisions:

  • run() is the public API. It handles wrapping, timing, retries, fallbacks, and tracing. Subclasses only implement _aexecute().
  • Async-first. All execution is rooted in async def _aexecute(). The synchronous run() method bridges into asyncio automatically.
  • >> operator flattens composition: a >> b >> c produces a single flat SequentialChain([a, b, c]), not nested chains.
  • Tracer propagation. Parent chains push their tracer instance to each child so spans nest correctly in a single unified tree.
  • Non-breaking partial failures. ParallelChain(fail_fast=False) collects all results including errors rather than aborting on first failure.

Quick Start

from fennec_community.chain import (
    TransformChain, SequentialChain, ParallelChain,
    ConditionalChain, ChainConfig, ChainContext,
    CachingChain, ChainRegistry, DeclarativeChainBuilder
)
import asyncio

# 1. Simple transform pipeline using >> operator
strip  = TransformChain(str.strip,  name="strip")
upper  = TransformChain(str.upper,  name="upper")
pipeline = strip >> upper

result = asyncio.run(pipeline.arun("  hello world  "))
print(result)  # "HELLO WORLD"

# 2. Parallel fan-out
parallel = ParallelChain({
    "stripped": strip,
    "uppercased": upper,
})
result = asyncio.run(parallel.arun("  hello  "))
print(result)  # {"stripped": "hello", "uppercased": "  HELLO  "}

# 3. Conditional routing
def router(data): return "upper" if data.islower() else "strip"
conditional = ConditionalChain(
    condition=router,
    branches={"upper": upper, "strip": strip}
)
result = asyncio.run(conditional.arun("hello"))
print(result)  # "HELLO"

Module: Core

ChainConfig

ChainConfig is a dataclass that centralises per-chain resilience settings: retries, delays, timeouts, fallbacks, and logging verbosity. Every BaseChain instance carries a ChainConfig and consults it during every execution.

from fennec_community.chain import ChainConfig

cfg = ChainConfig(
    retries=3,
    retry_delay=1.0,
    timeout=10.0,
    fallback_chain=my_backup_chain,
    verbose=True
)

Fields

Field Type Default Description
retries int 0 Number of additional execution attempts after the first failure. 0 means no retry (one attempt total). Total attempts = retries + 1.
retry_delay float 0.5 Seconds to wait between consecutive retry attempts. Applied via asyncio.sleep.
timeout float | None None Maximum wall-clock seconds allowed for a single _aexecute() call. None means no timeout is enforced. When exceeded, the chain returns a failed ChainOutput with a descriptive error message.
fallback_chain BaseChain | None None A chain to invoke if all retries are exhausted and the primary chain still fails. The fallback receives the original ChainInput. If None, the last exception is re-raised to the caller.
verbose bool False When True, emits DEBUG-level log messages including input, output, and execution time for every successful run.

Validation: __post_init__ raises ValueError if retries < 0, retry_delay < 0, or timeout <= 0 (when not None).


ChainInput

ChainInput is a typed dataclass that wraps the data flowing into a chain, along with metadata and the shared context bag.

from fennec_community.chain import ChainInput

inp = ChainInput(
    data={"query": "hello"},
    metadata={"user_id": "u123"},
)
inp.context["intent"] = "greet"

Fields

Field Type Default Description
data Any required The primary payload passed to the chain's _execute / _aexecute method. Can be any Python object — string, dict, dataclass, etc.
metadata Dict[str, Any] {} Arbitrary key-value pairs attached by the caller. Accumulated and merged across sequential steps.
context Dict[str, Any] {} Mutable shared bag that persists across the entire pipeline. Can hold a plain dict or a ChainContext instance.
timestamp float time.time() Unix epoch seconds at object creation. Auto-populated.

Note: When you call BaseChain.run(data, key=value), the framework automatically wraps data into a ChainInput with metadata={"key": value}. You can also pass a pre-built ChainInput directly — it is used as-is without re-wrapping.


ChainOutput

ChainOutput is a typed dataclass that wraps the data flowing out of a chain, including execution metadata and success/error state.

from fennec_community.chain import ChainOutput

out = ChainOutput(
    data={"result": "HELLO"},
    success=True,
    execution_time=0.003
)

Fields

Field Type Default Description
data Any required The result produced by the chain. Passed as the data of the next ChainInput in a sequential pipeline.
metadata Dict[str, Any] {} Enriched metadata from the chain. Merged into the accumulated metadata dict in SequentialChain.
execution_time float 0.0 Wall-clock duration in seconds, set by BaseChain._timed_execute.
success bool True True if the chain completed without error. A False value causes SequentialChain to short-circuit and return immediately.
error str | None None Human-readable error description. Populated when success=False.
chain_id str | None None Set automatically by BaseChain._guarded_run to the executing chain's chain_id.

ChainContext

ChainContext is a mutable, session-scoped shared whiteboard that every chain in a pipeline can read from and write to. It flows through the context field of ChainInput without being re-wrapped at each step, enabling rich cross-chain communication without tight coupling.

It supports both a flat default namespace (via __getitem__/__setitem__) and named sub-namespaces (via set(key, value, namespace="...")) to prevent key collisions between chains.

from fennec_community.chain import ChainContext

ctx = ChainContext(session_id="sess-001", user_role="admin")
ctx["intent"] = "purchase"
ctx.set("cart_total", 99.9, namespace="shop")

print(ctx["intent"])                            # "purchase"
print(ctx.get("cart_total", namespace="shop"))  # 99.9
print(ctx.get("missing", default=0))            # 0

ChainContext.__init__

ChainContext(session_id: Optional[str] = None, **initial_values: Any)

Constructs a new context with an optional session identifier and arbitrary initial key-value pairs.

Parameters

Parameter Type Default Description
session_id str | None None A human-readable or externally assigned session identifier. A random UUID is generated if not provided. Available as ctx.session_id.
**initial_values Any Any keyword arguments are immediately stored in the default namespace. E.g. ChainContext(user="alice") sets ctx["user"] = "alice".

ChainContext.get

get(key: str, default: Any = None, *, namespace: Optional[str] = None) -> Any

Retrieves a value from the context, optionally from a named sub-namespace.

Parameters

Parameter Type Default Description
key str required The key to look up.
default Any None Value returned if key is not found in the target namespace.
namespace str | None None When provided, looks up the key in the named sub-namespace instead of the default store. If the namespace itself does not exist, default is returned.

Returns

Any — The stored value, or default if not found.

Example

ctx.set("price", 49.99, namespace="checkout")
price = ctx.get("price", namespace="checkout")   # 49.99
missing = ctx.get("price")                       # None — not in default ns

ChainContext.set

set(key: str, value: Any, *, namespace: Optional[str] = None) -> None

Stores a value in the context, optionally under a named sub-namespace. Using namespaces prevents key collisions between unrelated chains in the same pipeline.

Parameters

Parameter Type Default Description
key str required The key to write.
value Any required The value to store.
namespace str | None None When provided, writes into a separate dict keyed by this namespace name. Creates the namespace dict automatically if it does not exist.

Returns

None

Example

ctx.set("token", "abc123", namespace="auth")
ctx.set("token", "xyz789", namespace="api")
# Both coexist without collision
print(ctx.get("token", namespace="auth"))  # "abc123"
print(ctx.get("token", namespace="api"))   # "xyz789"

ChainContext.update

update(mapping: Dict[str, Any]) -> None

Bulk-updates the default namespace from a dictionary. Equivalent to calling ctx[k] = v for each key-value pair.

Parameters

Parameter Type Description
mapping Dict[str, Any] A dictionary whose key-value pairs are merged into the default namespace. Existing keys are overwritten.

Returns

None

Example

ctx.update({"step": "payment", "amount": 120.0, "currency": "USD"})

ChainContext.as_dict

as_dict() -> Dict[str, Any]

Returns a shallow copy of the default namespace as a plain dict. Useful for serialisation, logging, or passing context state to external systems.

Parameters

None.

Returns

Dict[str, Any] — A new dict with all key-value pairs from the default namespace. Mutations to the returned dict do not affect the context.

Example

snapshot = ctx.as_dict()
print(json.dumps(snapshot))  # safe to serialise

TraceSpan

TraceSpan is a single node in the execution trace tree. Each chain execution creates one span; nested chains (e.g., steps inside SequentialChain) create child spans attached to the parent. Spans record timing, success/failure, input/output summaries, and arbitrary metadata.

Normally you do not construct TraceSpan directly — they are created automatically by ExecutionTracer.span(). However, you can read and inspect spans after execution.

Key Properties & Methods

Member Type Description
span_id str Short 8-character random ID for this span.
chain_name str Name of the chain that owns this span.
duration_ms float (property) Elapsed wall-clock time in milliseconds. Computed as (end_time - start_time) * 1000. Returns live value if the span is not yet finished.
success bool True if the span finished without exception.
error str | None Error message if success=False.
input_summary str First 120 characters of the input data (string-coerced).
output_summary str First 120 characters of the output data (string-coerced).
children List[TraceSpan] Child spans created by nested chain executions.
to_dict() Dict Serialises the span and all its children to a JSON-safe dict.
pretty(indent) str Human-readable indented tree with ✅/❌ status and timing.

TraceSpan.to_dict

to_dict() -> Dict[str, Any]

Serialises the span and its full child tree to a JSON-safe dictionary. Ideal for shipping traces to external observability systems (Datadog, OpenTelemetry, custom dashboards).

Parameters

None.

Returns

Dict[str, Any] with keys: span_id, chain, duration_ms, success, error, input, output, metadata, and steps (list of child span dicts).

Example

import json
trace_dict = tracer.root.to_dict()
print(json.dumps(trace_dict, indent=2))

TraceSpan.pretty

pretty(indent: int = 0) -> str

Returns a human-readable, indented string representation of the span tree. Each line shows ✅ or ❌, the chain name, and elapsed milliseconds.

Parameters

Parameter Type Default Description
indent int 0 Number of leading spaces for this level. Nested calls use indent + 4.

Returns

str — Multi-line tree string.

Example

print(tracer.root.pretty())
# ✅ [SequentialChain] 12.3ms
#     ✅ [strip] 0.1ms
#     ✅ [upper] 0.2ms

ExecutionTracer

ExecutionTracer manages the execution span tree during a chain run. It maintains a stack of active spans; each span() context manager pushes/pops the stack and automatically attaches new spans as children of the current active span.

from fennec_community.chain import ExecutionTracer

tracer = ExecutionTracer()
with tracer.span("MyChain") as span:
    span.set_input("hello")
    result = do_something()
    span.set_output(result)

print(tracer.pretty())

ExecutionTracer.span

@contextmanager
span(chain_name: str) -> Generator[TraceSpan, None, None]

Context manager that opens a new TraceSpan, links it into the execution tree, yields it for annotation, and closes it on exit (marking success or failure automatically based on whether an exception was raised).

Parameters

Parameter Type Description
chain_name str The name label for this span, typically the chain's .name attribute.

Returns

TraceSpan — Yielded inside the with block. Call span.set_input(data) and span.set_output(result) to annotate it.

Tree construction logic:

  • If the stack is empty, the new span becomes tracer.root.
  • If the stack is non-empty, the new span is attached as a child of the current top-of-stack span.
  • On exception inside the block, the span is marked success=False and the exception is re-raised.

Example

tracer = ExecutionTracer()
with tracer.span("Pipeline") as root_span:
    root_span.set_input("raw input")
    with tracer.span("Step1") as child_span:
        child_span.set_output("step1 result")
    root_span.set_output("final result")

print(tracer.pretty())
# ✅ [Pipeline] 5.2ms
#     ✅ [Step1] 1.1ms

ExecutionTracer.to_dict

to_dict() -> Dict[str, Any]

Returns the entire execution tree rooted at tracer.root as a JSON-safe dict. Returns an empty dict {} if no spans have been recorded yet.

Parameters

None.

Returns

Dict[str, Any] — The full tree as returned by TraceSpan.to_dict().

Example

import json
print(json.dumps(tracer.to_dict(), indent=2))

ExecutionTracer.pretty

pretty() -> str

Returns the entire execution tree as a human-readable multi-line string. Returns "(no trace)" if no spans have been recorded.

Parameters

None.

Returns

str — Indented execution tree with ✅/❌ status and timing for each chain.

Example

print(tracer.pretty())
# ✅ [SequentialChain] 14.7ms
#     ✅ [strip_whitespace] 0.2ms
#     ✅ [to_upper] 0.1ms
#     ✅ [ParallelChain] 8.3ms
#         ✅ [summary_chain] 7.1ms
#         ✅ [sentiment_chain] 6.8ms

BaseChain

BaseChain is the abstract root class for all chains. It implements the full execution lifecycle — wrapping, retries, timeouts, tracing, and metrics — in _guarded_run(). Subclasses only implement the async _aexecute() hook (or the synchronous _execute() hook for purely synchronous work).

You extend BaseChain to create custom chain types:

from fennec_community.chain import BaseChain, ChainInput, ChainOutput, ChainConfig

class MyLLMChain(BaseChain):
    def __init__(self, llm, **kwargs):
        super().__init__(name="LLMChain", config=ChainConfig(retries=2, timeout=30.0), **kwargs)
        self.llm = llm

    async def _aexecute(self, inp: ChainInput) -> ChainOutput:
        answer = await self.llm.agenerate(inp.data)
        return ChainOutput(data=answer, success=True)

BaseChain.__init__

BaseChain(
    name: Optional[str] = None,
    config: Optional[ChainConfig] = None,
    tracer: Optional[ExecutionTracer] = None
)

Parameters

Parameter Type Default Description
name str | None None Human-readable label for this chain instance. Used in log messages and trace spans. Defaults to the class name if not provided.
config ChainConfig | None None Per-chain resilience settings. A default ChainConfig() (no retries, no timeout) is created if not provided.
tracer ExecutionTracer | None None Shared execution tracer. A new ExecutionTracer() is created per instance if not provided. Pass a shared tracer to all chains in a pipeline for a unified trace tree.

Auto-assigned attributes:

  • chain_id"{name}-{uuid[:8]}" — a unique identifier for this chain instance.
  • _metricsdefaultdict(int) counting "success" and "error" executions.
  • _history — list of execution records (input, output, timing, success/error).

BaseChain.run

run(input_data: Any, **kwargs: Any) -> Any

Synchronous public entry point. Wraps input_data into a ChainInput, delegates to _guarded_run (via the event loop), and returns the raw data from the resulting ChainOutput.

This method is the synchronous bridge for environments that cannot use await. In async contexts, prefer arun() to avoid event-loop conflicts.

Parameters

Parameter Type Description
input_data Any The data to process. If already a ChainInput, used as-is. Otherwise wrapped automatically.
**kwargs Any Extra keyword arguments stored in ChainInput.metadata.

Returns

Any — The data field of the successful ChainOutput. Not the ChainOutput object itself.

Raises

  • Re-raises the last caught exception if all retries are exhausted and no fallback_chain is configured.

Example

result = my_chain.run("hello world", source="api", user_id="u42")
print(result)  # processed output

BaseChain.arun

async arun(input_data: Any, **kwargs: Any) -> Any

Async public entry point. Functionally identical to run() but uses await throughout, making it safe for use in async def functions, FastAPI handlers, and async pipelines.

Preferred over run() in all async contexts.

Parameters

Parameter Type Description
input_data Any The data to process. If already a ChainInput, used as-is.
**kwargs Any Extra keyword arguments stored in ChainInput.metadata.

Returns

Any — The data field of the successful ChainOutput.

Raises

  • Re-raises the last caught exception if all retries are exhausted and no fallback_chain is configured.

Example

# In an async function
result = await my_chain.arun("hello world")

# As the entry point of a script
import asyncio
result = asyncio.run(my_chain.arun("hello world"))

BaseChain.__call__

__call__(input_data: Any, **kwargs: Any) -> Any

Makes any chain callable as a function. Internally delegates to run().

Parameters

Same as run().

Returns

Same as run().

Example

# These three are equivalent:
result = my_chain.run("data")
result = my_chain("data")
result = asyncio.run(my_chain.arun("data"))

BaseChain.__rshift__ (>>)

__rshift__(other: BaseChain) -> SequentialChain

Composes two chains into a SequentialChain using the >> operator. If the left operand is already a SequentialChain, the new chain is appended to its flat list (no nesting). This means a >> b >> c always produces SequentialChain([a, b, c]), never SequentialChain([SequentialChain([a, b]), c]).

Parameters

Parameter Type Description
other BaseChain The chain to execute after self.

Returns

SequentialChain — A new sequential chain containing self followed by other.

Example

step1 = TransformChain(str.strip)
step2 = TransformChain(str.upper)
step3 = TransformChain(str.split)

pipeline = step1 >> step2 >> step3
# equivalent to SequentialChain([step1, step2, step3])

result = asyncio.run(pipeline.arun("  hello world  "))
# ["HELLO", "WORLD"]

BaseChain.__or__ (|)

__or__(other: BaseChain) -> SequentialChain

Alias for >>. Enables pipeline composition with the | operator. Identical behaviour to __rshift__.

Example

pipeline = step1 | step2 | step3

BaseChain.get_stats

get_stats() -> Dict[str, Any]

Returns a snapshot of execution statistics for this chain instance. Counts are accumulated across all run() / arun() calls since the chain was created.

Parameters

None.

Returns

Dict[str, Any] with the following keys:

Key Type Description
chain_id str Unique identifier of this chain instance.
name str Human-readable name.
total_executions int Total number of run() / arun() calls.
success_count int Number of calls that completed without error.
error_count int Number of calls that exhausted all retries.
success_rate float success_count / total_executions. 0.0 if no executions.
avg_execution_time_ms float Average wall-clock execution time in milliseconds across all successful runs.

Example

stats = my_chain.get_stats()
print(stats)
# {
#   'chain_id': 'MyChain-a1b2c3d4',
#   'name': 'MyChain',
#   'total_executions': 100,
#   'success_count': 98,
#   'error_count': 2,
#   'success_rate': 0.98,
#   'avg_execution_time_ms': 42.7
# }

BaseChain.get_trace

get_trace() -> Dict[str, Any]

Returns the most recent execution trace as a JSON-safe dictionary. The trace includes all nested child spans from the last run() or arun() call.

Parameters

None.

Returns

Dict[str, Any] — The root TraceSpan.to_dict() output. Returns {} if the chain has never been run or the tracer has no root span.

Example

import json

await my_pipeline.arun("test input")
trace = my_pipeline.get_trace()
print(json.dumps(trace, indent=2))
# {
#   "span_id": "a1b2c3d4",
#   "chain": "SequentialChain",
#   "duration_ms": 12.3,
#   "success": true,
#   "steps": [
#     {"span_id": "...", "chain": "strip", ...},
#     {"span_id": "...", "chain": "upper", ...}
#   ]
# }

Module: Chains

SequentialChain

SequentialChain executes a list of chains one after another, piping the output data of each step as the input data of the next. Metadata is accumulated and merged across steps. If any step returns success=False, the pipeline short-circuits and returns immediately with that error.

from fennec_community.chain import SequentialChain, TransformChain
import asyncio

pipeline = SequentialChain([
    TransformChain(str.strip),
    TransformChain(str.upper),
    TransformChain(str.split),
], name="text_pipeline")

result = asyncio.run(pipeline.arun("  hello world  "))
print(result)  # ["HELLO", "WORLD"]

SequentialChain.__init__

SequentialChain(
    chains: List[BaseChain],
    name: str = "SequentialChain",
    config: Optional[ChainConfig] = None,
    tracer: Optional[ExecutionTracer] = None
)

Parameters

Parameter Type Default Description
chains List[BaseChain] required Ordered list of chains to execute. Execution order follows the list order exactly.
name str "SequentialChain" Human-readable label for this pipeline.
config ChainConfig | None None Resilience settings (retries, timeout, fallback) applied to the sequential pipeline as a whole — not to individual steps. Each step uses its own config.
tracer ExecutionTracer | None None Shared tracer. When set, propagated to every sub-chain so child spans appear in the same tree.

SequentialChain.add

add(chain: BaseChain) -> "SequentialChain"

Appends a chain to the end of the step list and returns self, enabling a fluent builder pattern.

Parameters

Parameter Type Description
chain BaseChain The chain to append. Executed after all currently registered steps.

Returns

SequentialChain — Returns self for method chaining.

Example

pipeline = SequentialChain([])
pipeline.add(step1).add(step2).add(step3)

# Equivalent to:
pipeline = SequentialChain([step1, step2, step3])

ParallelChain

ParallelChain executes all branches concurrently using asyncio.gather, feeding each branch the same input. Results are aggregated into a dict (named branches) or list (indexed branches). Highly effective for fan-out patterns where the same input needs to be processed by multiple independent chains.

from fennec_community.chain import ParallelChain, TransformChain
import asyncio

parallel = ParallelChain({
    "original": TransformChain(lambda x: x),
    "upper":    TransformChain(str.upper),
    "words":    TransformChain(str.split),
})

result = asyncio.run(parallel.arun("hello world"))
print(result)
# {
#   "original": "hello world",
#   "upper": "HELLO WORLD",
#   "words": ["hello", "world"]
# }

ParallelChain.__init__

ParallelChain(
    chains: Union[Dict[str, BaseChain], List[BaseChain]],
    name: str = "ParallelChain",
    fail_fast: bool = True,
    config: Optional[ChainConfig] = None,
    tracer: Optional[ExecutionTracer] = None
)

Parameters

Parameter Type Default Description
chains Dict[str, BaseChain] | List[BaseChain] required Either a dict mapping branch labels to chains (result is a dict), or a list of chains (result is a list in the same order).
name str "ParallelChain" Human-readable label.
fail_fast bool True When True: If any branch raises an exception, the entire asyncio.gather is cancelled and the exception propagates. When False: All branches run to completion regardless of failures; results include None for failed branches, and output.metadata["branch_errors"] contains the error messages. The overall output is success=True if at least one branch succeeded.
config ChainConfig | None None Resilience settings for the parallel container chain.
tracer ExecutionTracer | None None Shared tracer propagated to all branches.

Example — fail_fast=False (partial results):

parallel = ParallelChain(
    {"fast": fast_chain, "slow": potentially_failing_chain},
    fail_fast=False
)
result = await parallel.arun("data")
# result.data = {"fast": "ok", "slow": None}
# result.metadata["branch_errors"] = {"slow": "timeout"}

ConditionalChain

ConditionalChain evaluates a condition function at runtime to select and execute one of several named branches. This enables dynamic routing: the same pipeline can follow different paths based on the actual input data.

from fennec_community.chain import ConditionalChain, TransformChain
import asyncio

def router(data: dict) -> str:
    return "high" if data.get("score", 0) > 0.7 else "low"

chain = ConditionalChain(
    condition=router,
    branches={
        "high": TransformChain(lambda d: f"HIGH PRIORITY: {d}"),
        "low":  TransformChain(lambda d: f"Normal: {d}"),
    },
    default=TransformChain(lambda d: f"Unknown: {d}")
)

result = asyncio.run(chain.arun({"score": 0.9, "text": "urgent"}))
print(result)  # "HIGH PRIORITY: {'score': 0.9, 'text': 'urgent'}"

ConditionalChain.__init__

ConditionalChain(
    condition: Callable[[Any], str],
    branches: Dict[str, BaseChain],
    default: Optional[BaseChain] = None,
    name: str = "ConditionalChain",
    config: Optional[ChainConfig] = None,
    tracer: Optional[ExecutionTracer] = None
)

Parameters

Parameter Type Default Description
condition Callable[[Any], str] required A callable that receives input_data.data and returns a str key. The key is looked up in branches. If the condition raises an exception, a failed ChainOutput is returned immediately.
branches Dict[str, BaseChain] required Dictionary mapping string route keys to chains. Only one branch is executed per call — the one whose key matches the condition's return value.
default BaseChain | None None Fallback chain executed when the condition returns a key not present in branches. If None and the key is missing, ChainOutput(success=False) is returned with a descriptive error.
name str "ConditionalChain" Human-readable label.
config ChainConfig | None None Resilience settings for the router itself (not individual branches).
tracer ExecutionTracer | None None Shared tracer. Propagated to the selected branch.

Routing metadata: The selected route key is stored in output.metadata["selected_route"] for auditability.


TransformChain

TransformChain is the simplest concrete chain: it wraps any synchronous or asynchronous callable and applies it to input_data.data. Use it to integrate any pure function into a pipeline without boilerplate.

from fennec_community.chain import TransformChain
import json, asyncio

parse_json  = TransformChain(json.loads,    name="json_parser")
to_upper    = TransformChain(str.upper,     name="to_upper")
word_count  = TransformChain(lambda s: len(s.split()), name="word_count")

pipeline = parse_json >> TransformChain(lambda d: d["text"]) >> to_upper
result = asyncio.run(pipeline.arun('{"text": "hello world"}'))
print(result)  # "HELLO WORLD"

TransformChain.__init__

TransformChain(
    transform_fn: Callable[[Any], Any],
    name: Optional[str] = None,
    config: Optional[ChainConfig] = None,
    tracer: Optional[ExecutionTracer] = None
)

Parameters

Parameter Type Default Description
transform_fn Callable[[Any], Any] required The transformation function. Receives one positional argument (input_data.data) and must return the transformed value. Can be sync or async (async def). Coroutine functions are detected and awaited automatically.
name str | None None Human-readable label. Defaults to transform_fn.__name__ if available, otherwise "TransformChain".
config ChainConfig | None None Resilience settings.
tracer ExecutionTracer | None None Shared tracer.

Error handling: Any exception raised by transform_fn is caught and returned as ChainOutput(success=False, error="TransformChain '{name}' raised: {exc}") — it does not propagate until BaseChain._guarded_run decides whether to retry.


Module: Registry

ChainRegistry

ChainRegistry is a global class-level registry (not an instance — all methods are @classmethod) that maps string aliases to BaseChain subclasses. It enables plugin-style dynamic chain lookup and instantiation without importing specific classes. The four built-in chain types are pre-registered at import time.

Pre-registered aliases:

Alias Class
"sequential" SequentialChain
"parallel" ParallelChain
"conditional" ConditionalChain
"transform" TransformChain
from fennec_community.chain import ChainRegistry

# Look up and instantiate a registered chain
cls = ChainRegistry.get("transform")
chain = cls(str.upper)

# Or in one call
chain = ChainRegistry.build("transform", str.upper)

ChainRegistry.register

@classmethod
register(alias: str, chain_cls: Type[BaseChain]) -> None

Registers a BaseChain subclass under a string alias. If the alias already exists, a warning is logged and the previous registration is overwritten.

Parameters

Parameter Type Description
alias str Unique string key to associate with the class. Convention: lowercase with underscores (e.g., "my_llm_chain").
chain_cls Type[BaseChain] The class object (not an instance) to register. Must be a subclass of BaseChain.

Returns

None

Raises

  • TypeError — if chain_cls is not a class or is not a subclass of BaseChain.

Example

class MyLLMChain(BaseChain):
    ...

ChainRegistry.register("llm", MyLLMChain)

ChainRegistry.chain

@classmethod
chain(alias: str) -> Callable[[Type[BaseChain]], Type[BaseChain]]

A class decorator that registers the decorated class under alias at definition time. The class is returned unchanged — decoration is transparent to the class itself.

Parameters

Parameter Type Description
alias str The registry key under which the class will be registered.

Returns

A decorator that accepts and returns a Type[BaseChain].

Example

@ChainRegistry.chain("sentiment")
class SentimentChain(BaseChain):
    async def _aexecute(self, inp):
        score = my_model.predict(inp.data)
        return ChainOutput(data=score, success=True)

# Now accessible:
chain = ChainRegistry.build("sentiment")

ChainRegistry.get

@classmethod
get(alias: str) -> Type[BaseChain]

Returns the class registered under alias. Does not instantiate it.

Parameters

Parameter Type Description
alias str The registered alias to look up.

Returns

Type[BaseChain] — The registered class object.

Raises

  • KeyError — with a message listing all available aliases if alias is not registered.

Example

TransformCls = ChainRegistry.get("transform")
chain = TransformCls(str.upper, name="my_upper")

ChainRegistry.build

@classmethod
build(alias: str, *args: Any, **kwargs: Any) -> BaseChain

Looks up the class registered under alias and instantiates it with the provided arguments. Combines get() and class construction in one call.

Parameters

Parameter Type Description
alias str The registered alias to look up and instantiate.
*args Any Positional arguments forwarded to the class constructor.
**kwargs Any Keyword arguments forwarded to the class constructor.

Returns

BaseChain — A new instance of the registered class.

Raises

  • KeyError — if alias is not registered (same as get()).

Example

transform_chain = ChainRegistry.build("transform", str.strip, name="strip_ws")
seq = ChainRegistry.build("sequential", [step1, step2], name="my_pipeline")

ChainRegistry.list_all

@classmethod
list_all() -> Dict[str, str]

Returns a dictionary mapping every registered alias to its corresponding class name. Useful for introspection, debugging, and building dynamic UI menus.

Parameters

None.

Returns

Dict[str, str]{alias: class_name} for all registered entries.

Example

print(ChainRegistry.list_all())
# {
#   'sequential':  'SequentialChain',
#   'parallel':    'ParallelChain',
#   'conditional': 'ConditionalChain',
#   'transform':   'TransformChain',
#   'sentiment':   'SentimentChain',   ← custom registration
# }

ChainRegistry.unregister

@classmethod
unregister(alias: str) -> None

Removes the alias from the registry. Silently does nothing if the alias is not present. Primarily intended for testing to clean up custom registrations.

Parameters

Parameter Type Description
alias str The alias to remove.

Returns

None


ChainRegistry.clear

@classmethod
clear() -> None

Wipes all entries from the registry, including the built-in chains. After clear(), all aliases — including "sequential", "parallel", "conditional", and "transform" — are gone. Primarily intended for test isolation.

Parameters

None.

Returns

None

Warning: Calling clear() in production code removes the built-in chain registrations. Use unregister() for targeted removals.


Module: Utils

cached

cached(fn: Callable) -> Callable

A decorator that adds in-memory memoisation to any synchronous callable. The cache key is a SHA-256 hash of the JSON-serialised arguments (args + kwargs). Unhashable or non-serialisable arguments fall back to a direct (non-cached) call rather than raising an error.

from fennec_community.chain import cached

@cached
def expensive_lookup(query: str) -> str:
    return slow_database.fetch(query)

result1 = expensive_lookup("hello")  # executes the function
result2 = expensive_lookup("hello")  # returns from cache ⚡

Parameters

Parameter Type Description
fn Callable Any synchronous callable to memoize. The original function's __name__, __doc__, and other attributes are preserved via functools.wraps.

Returns

Callable — The wrapped, cache-enabled function. The wrapper additionally exposes a cache_clear() method to wipe the cache store.

Cache invalidation:

expensive_lookup.cache_clear()  # wipes all cached entries for this function

Note: The cache is per-function-instance and lives in the closure. It is not shared across calls in different modules or processes. It is also not thread-safe for concurrent writes. For thread-safe or distributed caching, use CachingChain with an external backend.


CachingChain

CachingChain wraps any BaseChain instance and adds an in-memory result cache keyed by a SHA-256 hash of the serialised input data. Cache hits bypass the wrapped chain entirely. Ideal for expensive operations like LLM calls that may receive the same input repeatedly.

from fennec_community.chain import CachingChain

slow_chain = MyExpensiveLLMChain()
cached_chain = CachingChain(slow_chain)

result1 = await cached_chain.arun("What is AI?")  # executes slow_chain
result2 = await cached_chain.arun("What is AI?")  # returns from cache ⚡

print(cached_chain.cache_stats)
# {"hits": 1, "misses": 1, "size": 1}

CachingChain.__init__

CachingChain(
    chain: BaseChain,
    name: Optional[str] = None,
    config: Optional[ChainConfig] = None,
    tracer: Optional[ExecutionTracer] = None
)

Parameters

Parameter Type Default Description
chain BaseChain required The chain whose results should be cached.
name str | None None Human-readable label. Defaults to "Cached({chain.name})".
config ChainConfig | None None Resilience settings for the caching wrapper itself.
tracer ExecutionTracer | None None Shared tracer. Propagated to the inner chain on cache misses.

CachingChain.cache_clear

cache_clear() -> None

Clears all cached entries and resets the hit_count and miss_count counters to zero. Use this to force re-execution of the inner chain (e.g., after the underlying data changes).

Parameters

None.

Returns

None

Example

cached_chain.cache_clear()
# All subsequent calls will miss the cache and execute the inner chain

CachingChain.cache_stats (property)

@property
cache_stats -> Dict[str, int]

Returns a snapshot of cache performance counters.

Returns

Dict[str, int] with keys:

Key Description
hits Number of times a result was served from cache.
misses Number of times the inner chain was executed (cache miss).
size Current number of entries in the cache store.

Example

stats = cached_chain.cache_stats
print(f"Hit rate: {stats['hits'] / (stats['hits'] + stats['misses']):.1%}")

DeclarativeChainBuilder

DeclarativeChainBuilder enables building complete chain pipelines from plain Python dicts, JSON strings, or YAML strings — without writing any Python class definitions. This is particularly useful for configuration-driven architectures, no-code tooling, or dynamic pipeline construction from external config files.

All methods are @staticmethod — no instance is needed.

from fennec_community.chain import DeclarativeChainBuilder
import asyncio

spec = {
    "type": "sequential",
    "name": "text_pipeline",
    "steps": [
        {"type": "transform", "fn_name": "strip"},
        {"type": "transform", "fn_name": "upper"},
    ]
}

pipeline = DeclarativeChainBuilder.from_dict(
    spec,
    fn_registry={"strip": str.strip, "upper": str.upper}
)

result = asyncio.run(pipeline.arun("  hello world  "))
print(result)  # "HELLO WORLD"

DeclarativeChainBuilder.from_dict

@staticmethod
from_dict(
    spec: Dict[str, Any],
    fn_registry: Optional[Dict[str, Callable]] = None,
    chain_registry: Optional[Dict[str, Type[BaseChain]]] = None
) -> BaseChain

Builds a chain from a plain Python dictionary specification. The method is recursive — nested steps and branches dicts are resolved by calling from_dict on each nested spec.

Parameters

Parameter Type Default Description
spec Dict[str, Any] required The chain specification dictionary. Must contain a "type" key. See spec format below.
fn_registry Dict[str, Callable] | None None Maps fn_name strings in transform specs to actual callable objects. Required when using TransformChain steps with named functions.
chain_registry Dict[str, Type[BaseChain]] | None None Maps custom type strings to chain classes, extending the global ChainRegistry.

Returns

BaseChain — The constructed chain or pipeline ready for execution.

Raises

  • ValueError — if type is unknown, or if a transform step references a fn_name not in fn_registry.

Spec format reference:

# Sequential pipeline
{
    "type": "sequential",
    "name": "my_pipeline",       # optional
    "steps": [                   # list of nested specs
        {"type": "transform", "fn_name": "strip"},
        {"type": "transform", "fn_name": "upper"},
    ]
}

# Parallel fan-out (named branches → dict result)
{
    "type": "parallel",
    "branches": {
        "summary":   {"type": "transform", "fn_name": "summarize"},
        "sentiment": {"type": "transform", "fn_name": "analyze"},
    }
}

# Parallel fan-out (list branches → list result)
{
    "type": "parallel",
    "branches": [
        {"type": "transform", "fn_name": "fn_a"},
        {"type": "transform", "fn_name": "fn_b"},
    ]
}

# Transform step — callable by value
{"type": "transform", "fn": my_callable_object}

# Transform step — callable by name from fn_registry
{"type": "transform", "fn_name": "my_fn", "name": "optional_label"}

# Any alias registered in ChainRegistry
{"type": "sentiment", "name": "sentiment_step"}

Example

spec = {
    "type": "sequential",
    "steps": [
        {"type": "transform", "fn_name": "strip"},
        {
            "type": "parallel",
            "branches": {
                "upper": {"type": "transform", "fn_name": "upper"},
                "length": {"type": "transform", "fn_name": "length"},
            }
        }
    ]
}

pipeline = DeclarativeChainBuilder.from_dict(spec, fn_registry={
    "strip":  str.strip,
    "upper":  str.upper,
    "length": len,
})

result = asyncio.run(pipeline.arun("  hello  "))
print(result)  # {"upper": "HELLO", "length": 5}

DeclarativeChainBuilder.from_json

@staticmethod
from_json(
    json_str: str,
    fn_registry: Optional[Dict[str, Callable]] = None,
    chain_registry: Optional[Dict[str, Type[BaseChain]]] = None
) -> BaseChain

Parses a JSON string into a spec dictionary, then delegates to from_dict. Use for configuration stored in JSON files, API payloads, or environment variables.

Parameters

Parameter Type Default Description
json_str str required A valid JSON string whose top-level object is a chain spec dictionary.
fn_registry Dict[str, Callable] | None None Function registry forwarded to from_dict.
chain_registry Dict[str, Type[BaseChain]] | None None Chain class registry forwarded to from_dict.

Returns

BaseChain — The constructed pipeline.

Raises

  • json.JSONDecodeError — if json_str is not valid JSON.
  • ValueError — for unknown chain types or missing function references (same as from_dict).

Example

json_spec = '''
{
  "type": "sequential",
  "steps": [
    {"type": "transform", "fn_name": "strip"},
    {"type": "transform", "fn_name": "upper"}
  ]
}
'''

pipeline = DeclarativeChainBuilder.from_json(
    json_spec,
    fn_registry={"strip": str.strip, "upper": str.upper}
)
result = asyncio.run(pipeline.arun("  hello  "))
print(result)  # "HELLO"

DeclarativeChainBuilder.from_yaml

@staticmethod
from_yaml(
    yaml_str: str,
    fn_registry: Optional[Dict[str, Callable]] = None,
    chain_registry: Optional[Dict[str, Type[BaseChain]]] = None
) -> BaseChain

Parses a YAML string into a spec dictionary, then delegates to from_dict. Requires pyyaml to be installed. Use for human-authored configuration files.

Parameters

Parameter Type Default Description
yaml_str str required A valid YAML string whose top-level mapping is a chain spec.
fn_registry Dict[str, Callable] | None None Function registry forwarded to from_dict.
chain_registry Dict[str, Type[BaseChain]] | None None Chain class registry forwarded to from_dict.

Returns

BaseChain — The constructed pipeline.

Raises

  • ImportError — if pyyaml is not installed. Message: "pyyaml is required for YAML support: pip install pyyaml".
  • ValueError — for unknown chain types or missing function references.

Example

yaml_spec = """
type: sequential
name: text_pipeline
steps:
  - type: transform
    fn_name: strip
  - type: parallel
    branches:
      upper:
        type: transform
        fn_name: upper
      words:
        type: transform
        fn_name: split
"""

pipeline = DeclarativeChainBuilder.from_yaml(
    yaml_spec,
    fn_registry={
        "strip": str.strip,
        "upper": str.upper,
        "split": str.split,
    }
)

result = asyncio.run(pipeline.arun("  hello world  "))
print(result)  # {"upper": "HELLO WORLD", "words": ["hello", "world"]}

Operator Reference

Operator Method Behaviour
a >> b __rshift__ Sequential composition: run a, then b. Returns SequentialChain([a, b]).
a | b __or__ Alias for >>. Identical behaviour.
chain(data) __call__ Calls chain.run(data). Makes any chain callable as a function.
a >> b >> c Flattened >> Produces SequentialChain([a, b, c]) — never nested SequentialChain.

Error Handling & Resilience

Retry with back-off

from chain import ChainConfig

# 3 retries, 1-second delay between attempts
cfg = ChainConfig(retries=3, retry_delay=1.0)
chain = MyChain(config=cfg)

Timeout

cfg = ChainConfig(timeout=5.0)  # fail after 5 seconds
chain = MyChain(config=cfg)

When a timeout fires, the chain returns ChainOutput(success=False, error="Timeout after 5.01s (limit 5.0s)").

Fallback chain

from fennec_community.chain import ChainConfig, TransformChain

fallback = TransformChain(lambda _: "default answer", name="fallback")
cfg = ChainConfig(retries=2, fallback_chain=fallback)
primary = MyUnreliableChain(config=cfg)

# If primary fails 3 times, fallback chain is called with the original input
result = await primary.arun("query")

Partial failures in ParallelChain

parallel = ParallelChain(
    {"branch_a": chain_a, "branch_b": chain_b},
    fail_fast=False  # collect all results, don't abort on first failure
)
output = await parallel.arun("data")

if output.metadata.get("branch_errors"):
    print("Some branches failed:", output.metadata["branch_errors"])

Full Integration Examples

Example 1 — Custom LLM Chain with Retries, Tracing, and Stats

import asyncio
from fennec_community.chain import BaseChain, ChainInput, ChainOutput, ChainConfig, ExecutionTracer

class LLMChain(BaseChain):
    def __init__(self, llm, **kwargs):
        super().__init__(
            name="LLMChain",
            config=ChainConfig(retries=2, timeout=30.0, verbose=True),
            **kwargs
        )
        self.llm = llm

    async def _aexecute(self, inp: ChainInput) -> ChainOutput:
        answer = await self.llm.agenerate(inp.data)
        return ChainOutput(data=answer, success=True)

llm_chain = LLMChain(my_llm)
result = asyncio.run(llm_chain.arun("What is machine learning?"))
print(result)

# Observability
print(llm_chain.get_stats())
import json
print(json.dumps(llm_chain.get_trace(), indent=2))

Example 2 — Multi-Stage Pipeline with Context

import asyncio
from fennec_community.chain import TransformChain, SequentialChain, ChainInput, ChainContext

# Build context
ctx = ChainContext(session_id="session-001")
ctx["user_tier"] = "premium"

# Build pipeline
pipeline = (
    TransformChain(str.strip,  name="strip")
    >> TransformChain(str.lower, name="lowercase")
    >> TransformChain(str.split, name="tokenize")
)

# Pass context via ChainInput
inp = ChainInput(data="  Hello World  ", context=ctx)
result = asyncio.run(pipeline.arun(inp))
print(result)  # ["hello", "world"]

# Shared context is available to all steps
print(ctx["user_tier"])  # "premium" — still accessible

Example 3 — Parallel Fan-Out then Sequential Merge

import asyncio
from fennec_community.chain import TransformChain, ParallelChain, SequentialChain

# Stage 1: parallel enrichment
enrich = ParallelChain({
    "upper":  TransformChain(str.upper),
    "words":  TransformChain(str.split),
    "length": TransformChain(len),
})

# Stage 2: merge results
def merge(d): return f"TEXT: {d['upper']} | WORDS: {d['words']} | LEN: {d['length']}"
merge_chain = TransformChain(merge, name="merge")

pipeline = enrich >> merge_chain
result = asyncio.run(pipeline.arun("hello world"))
print(result)
# "TEXT: HELLO WORLD | WORDS: ['hello', 'world'] | LEN: 11"

Example 4 — Conditional Routing Pipeline

import asyncio
from fennec_community.chain import ConditionalChain, TransformChain, SequentialChain

def classify(data: str) -> str:
    return "question" if data.strip().endswith("?") else "statement"

question_chain  = TransformChain(lambda t: f"[Q] {t}", name="question_handler")
statement_chain = TransformChain(lambda t: f"[S] {t}", name="statement_handler")

router = ConditionalChain(
    condition=classify,
    branches={"question": question_chain, "statement": statement_chain},
    default=TransformChain(lambda t: f"[?] {t}", name="unknown_handler")
)

pipeline = TransformChain(str.strip) >> router

print(asyncio.run(pipeline.arun("What is AI?  ")))   # [Q] What is AI?
print(asyncio.run(pipeline.arun("AI is powerful."))) # [S] AI is powerful.

Example 5 — Caching Chain for Expensive Calls

import asyncio
from fennec_community.chain import CachingChain

expensive_chain = MyLLMChain()
cached = CachingChain(expensive_chain)

q = "Explain quantum computing"
r1 = asyncio.run(cached.arun(q))  # executes LLM
r2 = asyncio.run(cached.arun(q))  # cache hit ⚡
r3 = asyncio.run(cached.arun(q))  # cache hit ⚡

print(cached.cache_stats)  # {"hits": 2, "misses": 1, "size": 1}

cached.cache_clear()  # force refresh
r4 = asyncio.run(cached.arun(q))  # executes LLM again

Example 6 — Registry + Declarative Builder

import asyncio
from fennec_community.chain import ChainRegistry, DeclarativeChainBuilder, BaseChain, ChainInput, ChainOutput

# Register a custom chain
@ChainRegistry.chain("word_count")
class WordCountChain(BaseChain):
    async def _aexecute(self, inp: ChainInput) -> ChainOutput:
        return ChainOutput(data=len(inp.data.split()), success=True)

# Build from YAML config (loaded from file / env / API)
yaml_config = """
type: sequential
steps:
  - type: transform
    fn_name: strip
  - type: word_count
"""

pipeline = DeclarativeChainBuilder.from_yaml(
    yaml_config,
    fn_registry={"strip": str.strip}
)

result = asyncio.run(pipeline.arun("  hello beautiful world  "))
print(result)   # 3
print(ChainRegistry.list_all())

Example 7 — FastAPI Async Service

from fastapi import FastAPI
from fennec_community.chain import SequentialChain, TransformChain, CachingChain, ChainConfig
import asyncio

app = FastAPI()

pipeline = CachingChain(
    SequentialChain([
        TransformChain(str.strip, name="strip"),
        TransformChain(str.upper, name="upper"),
    ], config=ChainConfig(retries=2, timeout=5.0)),
)

@app.post("/process")
async def process(text: str):
    result = await pipeline.arun(text)
    return {
        "result": result,
        "cache_stats": pipeline.cache_stats,
        "chain_stats": pipeline.get_stats(),
    }

@app.get("/trace")
async def get_trace():
    return pipeline.get_trace()

Source: community/chain.md