`federated_rag` — Enterprise API Reference
Table of Contents
- Overview
- Architecture
- Quick Start
- Enumerations
- Data Models
- Class: FederatedRAG
- Return Value Reference
- Streaming Strategies by AggregationMethod
- Backend Compatibility
- Circuit Breaker Behaviour
- Caching Behaviour
- Error Codes
- Complete Examples
Overview
federated_rag is a production-grade federation layer that sits on top of multiple Retrieval-Augmented Generation (RAG) backends and exposes a unified query interface. Instead of calling one RAG system and accepting its single answer, FederatedRAG queries all registered backends in parallel, aggregates their responses according to a configurable strategy, and returns (or streams) the best possible answer.
Key capabilities at a glance:
| Capability | Detail |
|---|---|
| Parallel querying | All enabled, healthy sources are queried concurrently via asyncio.gather |
| Aggregation strategies | Weighted, Voting, Ranking, Merge, Simple |
| Real-time streaming | Sync (stream) and async (astream) with per-chunk StreamChunk objects |
| Circuit breaking | Automatically skips repeatedly-failing sources; self-recovers after a timeout |
| TTL caching | Thread-safe in-memory cache; hits avoid redundant backend calls |
| Lifecycle hooks | Pre- and post-query callables for logging, metrics, and tracing |
| Zero-config fallback | If a backend has no stream() method, the federation emits word-level synthetic chunks |
Architecture
┌─────────────────────────────────────────────────────────┐
│ FederatedRAG │
│ │
│ ┌─────────┐ ┌──────────────┐ ┌───────────────────┐ │
│ │TTLCache │ │LifecycleHooks│ │AggregationEngine │ │
│ └─────────┘ └──────────────┘ └───────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ FederatedSource (×N) │ │
│ │ name · rag_system · weight · timeout │ │
│ │ CircuitBreaker · stats tracking │ │
│ └────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│ parallel asyncio.gather / asyncio tasks
▼
[ RAG Backend A ] [ RAG Backend B ] [ RAG Backend C ]Quick Start
from fennec_community.rag.types.federated_rag import FederatedRAG, AggregationMethod
# 1. Instantiate the federation
fed = FederatedRAG(
aggregation_method=AggregationMethod.WEIGHTED,
cache_ttl=120.0,
)
# 2. Register your RAG backends
fed.add_source("docs", docs_rag, weight=1.5, timeout=8.0)
fed.add_source("wiki", wiki_rag, weight=1.0, timeout=5.0)
fed.add_source("arxiv", arxiv_rag, weight=0.8, timeout=12.0)
# 3a. Buffered (synchronous)
result = fed.query("What is Retrieval-Augmented Generation?")
print(result["answer"])
# 3b. Buffered (async)
result = await fed.query_async("What is RAG?")
# 3c. Streaming (sync)
for chunk in fed.stream("Explain transformers"):
print(chunk.text, end="", flush=True)
# 3d. Streaming (async)
async for chunk in fed.astream("Explain transformers"):
print(chunk.text, end="", flush=True)Enumerations
AggregationMethod
class AggregationMethod(str, Enum):
WEIGHTED = "weighted"
VOTING = "voting"
RANKING = "ranking"
SIMPLE = "simple"
MERGE = "merge"Controls how responses from multiple sources are combined into a single answer.
| Member | Value | Description |
|---|---|---|
WEIGHTED |
"weighted" |
Picks the source with the highest weight, breaking ties with score. Best for trusted primary sources. |
VOTING |
"voting" |
Groups responses by their first 60 characters, then selects the group with the highest combined weight. Best for factual queries where consensus matters. |
RANKING |
"ranking" |
Sorts all results by score × weight descending and returns the top result. Best when sources return numeric relevance scores. |
SIMPLE |
"simple" |
Returns the answer from the first source that responds. Fastest but no intelligence applied. |
MERGE |
"merge" |
Concatenates all unique answers, prefixed with [source_name]. Best for information-gathering queries where diversity is valued. |
Streaming note:
VOTINGcannot be truly streamed (it requires all answers before voting). Whenastream/streamis called withVOTING, the federation buffers everything viaquery_asyncand emits one singleStreamChunk. All other methods stream natively.
Data Models
StreamChunk
A dataclass representing a single unit of streamed output. Every call to stream() or astream() yields a sequence of these objects.
@dataclass
class StreamChunk:
text: str
source: str
done: bool = False
metadata: Optional[Dict[str, Any]] = None
error: Optional[str] = None| Field | Type | Description |
|---|---|---|
text |
str |
The incremental text fragment for this chunk. Empty string on terminal (done/error) chunks. |
source |
str |
The name of the RAG backend that produced this chunk, or "__federation__" for system-level messages. |
done |
bool |
True on the final chunk for a given source. Signals that the source has finished producing output. |
metadata |
Optional[Dict[str, Any]] |
Optional per-chunk payload. May include score, latency_ms, weight, or aggregation_method. |
error |
Optional[str] |
Non-None when this chunk signals a failure. Inspect this field to distinguish error sentinels from normal done signals. |
Usage pattern:
async for chunk in fed.astream("What is RAG?"):
if chunk.error:
print(f"\n[ERROR from {chunk.source}]: {chunk.error}")
elif chunk.done:
print(f"\n[{chunk.source} finished — {chunk.metadata}]")
else:
print(chunk.text, end="", flush=True)str(chunk) returns chunk.text, so print(chunk) works naturally in simple pipelines.
Class: FederatedRAG
The central class of the module. Manages source registration, parallel querying, aggregation, caching, and streaming.
from fennec_community.rag.types.federated_rag import FederatedRAGConstructor — __init__
FederatedRAG(
aggregation_method: AggregationMethod | str = AggregationMethod.WEIGHTED,
min_sources: int = 1,
cache_ttl: float = 60.0,
cache_max_size: int = 256,
stream_race_timeout: float = 2.0,
pre_query_hook: Optional[Callable[[str, Optional[Dict]], None]] = None,
post_query_hook: Optional[Callable[[Dict[str, Any]], None]] = None,
)Purpose: Creates a new FederatedRAG instance with the given configuration. No backends are registered at construction time; use add_source to add them.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
aggregation_method |
AggregationMethod | str |
AggregationMethod.WEIGHTED |
Strategy used to merge responses from multiple sources. Accepts the enum member or its string value (e.g., "weighted"). |
min_sources |
int |
1 |
Minimum number of active (enabled + circuit-closed) sources required before a query is executed. Returns an error dict if this threshold is not met. |
cache_ttl |
float |
60.0 |
Time-to-live in seconds for cached query results. A value of 0.0 effectively disables caching. |
cache_max_size |
int |
256 |
Maximum number of entries in the in-memory TTL cache. When full, the oldest entry is evicted. |
stream_race_timeout |
float |
2.0 |
Seconds to wait during the RANKING streaming race. Sources that produce no first chunk within this window are excluded from the winner selection. |
pre_query_hook |
Callable[[str, Optional[Dict]], None] | None |
None |
Called before every query (buffered and streamed) with (query, context). Use for logging, request-id injection, or rate-limit checks. |
post_query_hook |
Callable[[Dict[str, Any]], None] | None |
None |
Called after every buffered query completes, receiving the full aggregated result dict. Not called on cache hits. Not called during streaming. |
Returns: FederatedRAG instance.
Example:
import logging
from fennec_community.rag.types.federated_rag import FederatedRAG, AggregationMethod
logging.basicConfig(level=logging.INFO)
def before(query, ctx):
print(f"[PRE] query={query!r} context={ctx}")
def after(result):
print(f"[POST] sources={result['sources_used']} cached={result['cached']}")
fed = FederatedRAG(
aggregation_method=AggregationMethod.RANKING,
min_sources=2,
cache_ttl=300.0,
cache_max_size=512,
stream_race_timeout=1.5,
pre_query_hook=before,
post_query_hook=after,
)Source Management
add_source
fed.add_source(
name: str,
rag_system: Any,
weight: float = 1.0,
timeout: float = 10.0,
metadata: Optional[Dict] = None,
) -> FederatedRAGPurpose: Registers a new RAG backend with the federation. The backend is immediately available for querying. Returns self to support method chaining.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Unique identifier for this source within the federation. Used as the source field in results and StreamChunk objects. |
rag_system |
Any |
— | The backend RAG object. Must expose at minimum a generate(query) method (see Backend Compatibility for full requirements). |
weight |
float |
1.0 |
Relative priority of this source. Must be strictly positive (> 0). Higher weight means this source is preferred in WEIGHTED, RANKING, and MERGE strategies, and is tried first in streaming. |
timeout |
float |
10.0 |
Per-source timeout in seconds. If the backend does not respond within this window, the source is skipped for this query and its circuit breaker records a failure. |
metadata |
Optional[Dict] |
None |
Arbitrary key-value pairs attached to the source. Passed as extra keyword arguments to backends that declare **kwargs. Never passed as query. |
Returns: FederatedRAG — the same instance, enabling chaining.
Raises: ValueError if weight <= 0.
Example:
# Chained registration
fed = (
FederatedRAG()
.add_source("primary_docs", docs_rag, weight=2.0, timeout=5.0)
.add_source("wiki", wiki_rag, weight=1.0, timeout=8.0)
.add_source("arxiv", arxiv_rag, weight=0.5, timeout=15.0,
metadata={"domain": "academic"})
)remove_source
fed.remove_source(name: str) -> NonePurpose: Permanently removes a registered source from the federation. After removal, the source is no longer queried, and all its accumulated statistics are discarded.
Parameters:
| Parameter | Type | Description |
|---|---|---|
name |
str |
The name of the source to remove, as originally passed to add_source. |
Returns: None
Raises: KeyError if the source name does not exist.
Example:
fed.remove_source("arxiv")
# "arxiv" will no longer be queriedenable_source
fed.enable_source(name: str, enabled: bool = True) -> NonePurpose: Temporarily enables or disables a source without removing it. Disabled sources are excluded from all queries (buffered and streaming) but retain their configuration and statistics. This is useful for maintenance windows or A/B testing.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | The name of the source to toggle, as originally passed to add_source. |
enabled |
bool |
True |
Pass True to re-enable the source or False to disable it. |
Returns: None
Raises: KeyError if the source name does not exist.
Example:
# Disable a source during maintenance
fed.enable_source("arxiv", enabled=False)
result = fed.query("What is RAG?") # Only docs and wiki are queried
# Re-enable after maintenance
fed.enable_source("arxiv", enabled=True)get_stats
fed.get_stats() -> Dict[str, Dict]Purpose: Returns per-source operational statistics accumulated since the FederatedRAG instance was created (or since the source was registered). Useful for observability dashboards, alerting, and debugging slow or unreliable backends.
Parameters: None.
Returns: Dict[str, Dict] — a dictionary keyed by source name. Each value is a stats dictionary with the following fields:
| Field | Type | Description |
|---|---|---|
total_queries |
int |
Total number of times this source has been called (including failures). |
total_errors |
int |
Number of calls that resulted in a timeout or exception. |
avg_latency_ms |
float |
Mean response latency across all calls, in milliseconds. |
error_rate |
float |
Ratio of total_errors / total_queries, in the range [0.0, 1.0]. |
circuit_state |
str |
Current circuit breaker state: "closed", "open", or "half_open". |
Example:
stats = fed.get_stats()
for source_name, s in stats.items():
print(
f"{source_name}: "
f"{s['total_queries']} calls | "
f"avg {s['avg_latency_ms']} ms | "
f"error rate {s['error_rate']:.1%} | "
f"circuit {s['circuit_state']}"
)Output example:
primary_docs: 120 calls | avg 243.7 ms | error rate 1.7% | circuit closed
wiki: 89 calls | avg 510.2 ms | error rate 5.6% | circuit closed
arxiv: 45 calls | avg 1420.0 ms | error rate 20.0% | circuit openBuffered Query API
These methods wait for all sources to respond, aggregate the results, and return a single complete response dictionary. Use them when you need the full answer before rendering.
query
fed.query(
query: str,
context: Optional[Dict] = None,
top_k: int = 5,
) -> Dict[str, Any]Purpose: Synchronous, buffered multi-source query. Internally runs query_async in the correct asyncio context, making it safe to call from both synchronous code and from within an already-running event loop (e.g., Jupyter notebooks, FastAPI background tasks).
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
query |
str |
— | The natural-language question or search query to send to all active sources. |
context |
Optional[Dict] |
None |
Optional contextual metadata forwarded to backends that declare a context parameter. A "language" key is handled specially — it is forwarded to backends that declare a language parameter. |
top_k |
int |
5 |
Number of documents each source should retrieve before generating an answer. Forwarded to backends that declare a top_k parameter. |
Returns: Dict[str, Any] — See Return Value Reference.
Example:
result = fed.query(
"What are the main types of neural networks?",
context={"language": "en", "user_id": "u-42"},
top_k=10,
)
print(result["answer"])
print("Sources used:", result["sources_used"])
print("Cached?", result["cached"])query_async / aquery
await fed.query_async(
query: str,
context: Optional[Dict] = None,
top_k: int = 5,
) -> Dict[str, Any]
# Alias:
await fed.aquery(query, context, top_k)Purpose: Async coroutine equivalent of query. Queries all active, circuit-closed sources concurrently via asyncio.gather, then aggregates and caches the result. This is the core query implementation; query delegates to it.
Parameters: Identical to query.
Returns: Dict[str, Any] — See Return Value Reference.
Behaviour details:
- Checks the TTL cache — returns immediately if a hit exists (
"cached": True). - Calls
pre_query_hook(query, context)if configured. - Calls all active, circuit-healthy sources in parallel.
- Aggregates successful results using the configured
AggregationMethod. - Writes the result to the TTL cache.
- Calls
post_query_hook(result)if configured. - Returns the aggregated dict.
Example:
import asyncio
from fennec_community.rag.types.federated_rag import FederatedRAG, AggregationMethod
async def main():
fed = FederatedRAG(aggregation_method=AggregationMethod.MERGE)
fed.add_source("src_a", rag_a)
fed.add_source("src_b", rag_b)
result = await fed.query_async("Explain attention mechanisms", top_k=8)
print(result["answer"])
asyncio.run(main())aquery note: aquery is a direct alias for query_async. Both are identical in behaviour. Use whichever reads more naturally in your codebase.
Streaming API
These methods yield StreamChunk objects incrementally as the backend produces tokens. Use them when you want to display text to the user in real-time (e.g., chat UIs, terminals, SSE endpoints).
stream
fed.stream(
query: str,
context: Optional[Dict] = None,
top_k: int = 5,
) -> Generator[StreamChunk, None, None]Purpose: Synchronous streaming query. Yields StreamChunk objects one at a time as they arrive from the winning source. Safe to use in both synchronous and async contexts (e.g., inside Jupyter cells).
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
query |
str |
— | The natural-language query to stream answers for. |
context |
Optional[Dict] |
None |
Optional context dict forwarded to backends. See query for details. |
top_k |
int |
5 |
Number of documents to retrieve per source. |
Returns: Generator[StreamChunk, None, None] — A standard Python generator. Each iteration yields one StreamChunk. The stream ends when all active sources have produced a terminal chunk (done=True).
Internal mechanism: Detects whether an event loop is already running. If yes, it collects all chunks in a background thread and yields them after collection. If no loop is running, it drives the async generator manually chunk-by-chunk using a fresh event loop.
Example:
print("Answer: ", end="")
for chunk in fed.stream("Describe the transformer architecture", top_k=5):
if chunk.error:
print(f"\n[ERROR: {chunk.error}]")
elif not chunk.done:
print(chunk.text, end="", flush=True)
print() # newline at endastream
async def astream(
query: str,
context: Optional[Dict] = None,
top_k: int = 5,
) -> AsyncGenerator[StreamChunk, None]Purpose: Async streaming query. The primary streaming implementation. Yields StreamChunk objects asynchronously as the backend produces output. The behaviour depends on the configured AggregationMethod (see Streaming Strategies).
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
query |
str |
— | The natural-language query to stream answers for. |
context |
Optional[Dict] |
None |
Optional context dict forwarded to backends. See query for details. |
top_k |
int |
5 |
Number of documents to retrieve per source. |
Returns: AsyncGenerator[StreamChunk, None] — An async generator. Use with async for.
Behaviour details:
- Calls
pre_query_hook(query, context)if configured. - Checks that enough active sources exist; yields an error chunk and returns if not.
- Routes to the appropriate internal streaming strategy based on
AggregationMethod. - For each source: tries native
astream→stream→generate_streammethods. Falls back to_call_generate+ word-level synthetic chunks if the backend has no stream method.
FastAPI / SSE example:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/stream")
async def stream_answer(q: str):
async def generate():
async for chunk in fed.astream(q):
if not chunk.done and not chunk.error:
yield chunk.text
return StreamingResponse(generate(), media_type="text/plain")Metadata inspection example:
async for chunk in fed.astream("What is BERT?"):
if chunk.done:
meta = chunk.metadata or {}
print(f"\n✓ {chunk.source} — {meta.get('latency_ms')} ms")
elif not chunk.error:
print(chunk.text, end="", flush=True)Return Value Reference
Both query and query_async return a Dict[str, Any] with the following guaranteed keys plus aggregation-method-specific extras.
Guaranteed keys (all methods)
| Key | Type | Description |
|---|---|---|
answer |
str |
The final answer text selected or combined by the aggregation strategy. |
sources_used |
List[str] |
Names of all sources that responded successfully. |
num_sources |
int |
Number of sources that responded successfully. |
cached |
bool |
True if the result was served from the TTL cache, False if freshly computed. |
Error response keys
Returned when the federation itself fails (not a single source — single source failures are silently skipped).
| Key | Type | Description |
|---|---|---|
answer |
str |
Human-readable error message. |
error |
str |
Machine-readable error code. See Error Codes. |
sources_used |
List |
Always an empty list on federation-level errors. |
num_sources |
int |
Always 0 on federation-level errors. |
Extra keys by AggregationMethod
| Method | Extra Keys |
|---|---|
WEIGHTED |
primary_source (str), all_results (List[Dict]), aggregation_method (str) |
VOTING |
primary_source (str), vote_counts (Dict[str, int]), all_results (List[Dict]), aggregation_method (str) |
RANKING |
primary_source (str), ranked_results (List[Dict]), aggregation_method (str) |
MERGE |
all_results (List[Dict]), aggregation_method (str) |
SIMPLE |
primary_source (str), all_results (List[Dict]), aggregation_method (str) |
Per-source result dict (inside all_results / ranked_results)
Each element represents one backend's contribution:
| Key | Type | Description |
|---|---|---|
answer |
str |
The raw answer text from this backend. |
source |
str |
The backend's registered name. |
weight |
float |
The weight assigned to this source. |
score |
float |
Relevance score from the backend's retriever (0.0 if not available). |
latency_ms |
float |
Round-trip time to this source in milliseconds. |
Streaming Strategies by AggregationMethod
| AggregationMethod | Streaming Behaviour |
|---|---|
WEIGHTED |
Sources are sorted by weight (highest first). The federation streams from the first source that produces content without error. |
SIMPLE |
Same as WEIGHTED. |
RANKING |
All sources race for stream_race_timeout seconds. The source with the highest score × weight first chunk wins. The winner's remaining chunks are then streamed. |
MERGE |
Sources are streamed sequentially in descending weight order. Each source's output is prefixed with [source_name] and separated by \n\n. |
VOTING |
Cannot stream by nature. Falls back to query_async, then emits one single StreamChunk with the full answer. |
Backend Compatibility
FederatedRAG uses signature-aware introspection to call backends. It never passes a parameter the backend's method does not declare, preventing TypeError on mixed-capability backends.
Minimum required interface
class MyRAGBackend:
def generate(self, query: str) -> str | dict:
...Full interface (all capabilities unlocked)
class FullRAGBackend:
def retrieve(self, query: str, top_k: int = 5) -> List[Tuple[Document, float]]:
"""Returns list of (document, score) tuples."""
...
async def aretrieve(self, query: str, top_k: int = 5) -> List[Tuple[Document, float]]:
"""Async variant of retrieve."""
...
def generate(self, query: str, language: str = "en") -> str | dict:
"""Generate an answer. Return str or dict with at least {"answer": str}."""
...
async def agenerate(self, query: str, language: str = "en") -> str | dict:
"""Async variant of generate."""
...
def stream(self, query: str, top_k: int = 5) -> Iterable[str | dict]:
"""Yield text fragments or {"text": str, "score": float} dicts."""
...
async def astream(self, query: str, top_k: int = 5) -> AsyncIterable[str | dict]:
"""Async variant of stream."""
...Method resolution priority
| Operation | Priority order |
|---|---|
| Async generation | agenerate → generate (coroutine) → generate (sync, run in thread) |
| Sync retrieval | aretrieve (awaited) → retrieve (in executor) |
| Streaming | astream → stream → generate_stream → buffered fallback |
context and language forwarding
- If a backend's method declares a
contextparameter andcontextis notNone, it is forwarded. - If a backend's method declares a
languageparameter andcontextis a dict with a"language"key, that value is extracted and forwarded aslanguage=<value>.
Circuit Breaker Behaviour
Each registered source has its own independent CircuitBreaker. The federation automatically skips sources whose circuit is open.
State machine
CLOSED ──(≥ threshold failures)──▶ OPEN ──(recovery_timeout elapsed)──▶ HALF_OPEN
▲ │
└────────────────(success in HALF_OPEN)──────────────────────────────────┘
│
(failure in HALF_OPEN)
▼
OPENDefault thresholds
| Parameter | Default | Description |
|---|---|---|
failure_threshold |
3 |
Consecutive failures before the circuit opens. |
recovery_timeout |
30.0 s |
Seconds before an open circuit transitions to HALF_OPEN and allows a probe request. |
Effect on queries
CLOSED: Source is queried normally.OPEN: Source is excluded from this query entirely. No network call is made.HALF_OPEN: One probe request is allowed. Success →CLOSED; failure →OPEN(timer resets).
Monitor circuit states via get_stats():
for name, s in fed.get_stats().items():
if s["circuit_state"] == "open":
print(f"⚠️ {name} is circuit-open — check that backend!")Caching Behaviour
FederatedRAG uses a thread-safe, in-memory TTL cache (TTLCache) to avoid redundant backend calls for identical queries.
Cache key
The cache key is a SHA-256 hash of f"{query}|{sorted(context.items())}". Two calls are cache-equivalent only if both query and all context key-value pairs match exactly.
Cache hit behaviour
- Hits are returned immediately with
"cached": True. post_query_hookis not called on cache hits.pre_query_hookis not called on cache hits.- Streaming (
stream/astream) does not check the cache — streams are always live.
Invalidation
There is no public method to invalidate individual entries. The entire cache can be cleared via:
fed._cache.invalidate() # clears all entriesEntries also expire naturally after cache_ttl seconds from when they were written.
Eviction
When the cache reaches cache_max_size entries, the entry with the oldest insertion timestamp is evicted (LRU by insertion time, not access time).
Error Codes
| Code | Trigger condition | answer content |
|---|---|---|
"insufficient_sources" |
Fewer active sources available than min_sources. |
"Need ≥ N active source(s), found M" |
"all_sources_failed" |
All sources returned None (timed out or raised exceptions). |
"All sources failed or timed out" |
Error dicts have the shape:
{
"answer": "<human-readable message>",
"error": "<error_code>",
"sources_used": [],
"num_sources": 0,
}Complete Examples
Example 1 — Basic multi-source query with stats
import logging
from fennec_community.rag.types.federated_rag import FederatedRAG, AggregationMethod
logging.basicConfig(level=logging.INFO)
fed = FederatedRAG(aggregation_method=AggregationMethod.RANKING)
fed.add_source("docs", docs_rag, weight=2.0, timeout=5.0)
fed.add_source("wiki", wiki_rag, weight=1.0, timeout=8.0)
result = fed.query("What is retrieval-augmented generation?", top_k=5)
if "error" in result:
print("Federation error:", result["error"])
else:
print("Answer:", result["answer"])
print("Primary source:", result.get("primary_source"))
print("Ranked sources:", [r["source"] for r in result.get("ranked_results", [])])
print("\nSource Stats:")
for src, s in fed.get_stats().items():
print(f" {src}: {s['avg_latency_ms']} ms avg, {s['error_rate']:.0%} error rate")Example 2 — Async streaming in a FastAPI SSE endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fennec_community.rag.types.federated_rag import FederatedRAG, AggregationMethod
app = FastAPI()
fed = FederatedRAG(aggregation_method=AggregationMethod.WEIGHTED)
fed.add_source("primary", primary_rag, weight=2.0)
fed.add_source("fallback", fallback_rag, weight=0.5)
@app.get("/ask")
async def ask(q: str, lang: str = "en"):
async def token_stream():
ctx = {"language": lang}
async for chunk in fed.astream(q, context=ctx, top_k=5):
if chunk.done:
yield f"\n[done: {chunk.source}]"
elif chunk.error:
yield f"\n[error from {chunk.source}: {chunk.error}]"
else:
yield chunk.text
return StreamingResponse(token_stream(), media_type="text/plain")Example 3 — MERGE strategy for research aggregation
import asyncio
from fennec_community.rag.types.federated_rag import FederatedRAG, AggregationMethod
async def research(question: str):
fed = FederatedRAG(aggregation_method=AggregationMethod.MERGE)
fed.add_source("papers", papers_rag, weight=3.0)
fed.add_source("textbooks", textbooks_rag, weight=2.0)
fed.add_source("web", web_rag, weight=1.0)
result = await fed.query_async(question, top_k=10)
# result["answer"] will contain [papers] ...\n\n[textbooks] ...\n\n[web] ...
return result["answer"]
answer = asyncio.run(research("Explain the attention mechanism in transformers"))
print(answer)Example 4 — Dynamic source management
from fennec_community.rag.types.federated_rag import FederatedRAG
fed = FederatedRAG()
fed.add_source("src_a", rag_a, weight=1.0)
fed.add_source("src_b", rag_b, weight=1.5)
fed.add_source("src_c", rag_c, weight=0.8)
# Disable a source for maintenance
fed.enable_source("src_c", enabled=False)
result = fed.query("Who invented the internet?") # Only src_a and src_b queried
# Re-enable after maintenance
fed.enable_source("src_c", enabled=True)
# Add a new source at runtime
fed.add_source("src_d", rag_d, weight=2.0)
# Remove a decommissioned source
fed.remove_source("src_a")
print(f"Active federation: {fed}") # FederatedRAG(sources=['src_b', 'src_c', 'src_d'], ...)
print(f"Total sources: {len(fed)}")Example 5 — Lifecycle hooks for observability
import time
from fennec_community.rag.types.federated_rag import FederatedRAG
request_log = []
def pre_hook(query: str, context):
request_log.append({"query": query, "ts": time.time(), "status": "started"})
print(f"[METRIC] query started: {query[:50]!r}")
def post_hook(result: dict):
request_log.append({
"sources": result["sources_used"],
"cached": result["cached"],
"ts": time.time(),
"status": "done",
})
print(f"[METRIC] query done | sources={result['sources_used']} cached={result['cached']}")
fed = FederatedRAG(
pre_query_hook=pre_hook,
post_query_hook=post_hook,
)
fed.add_source("main", my_rag)
fed.query("What is machine learning?")Simple Real Example
import asyncio
from fennec_community.llm import MistralInterface
from fennec_community.document_loaders import TextLoader
from fennec_community.vector_database import FAISSVectorDatabase
from fennec_community.chunks import ArabicTextChunker
from fennec_community.context import ContextManager
from fennec_community.embeddings import OllamaEmbedder
from fennec_community.rag.core import RAGSystem
from fennec_community.rag.types.federated_rag import FederatedRAG , AggregationMethod
chunker = ArabicTextChunker(chunk_size=100, overlap=20)
embedder = OllamaEmbedder()
vector_db = FAISSVectorDatabase(embedder=embedder)
llm = MistralInterface(api_key=llm_api)
context_manager = ContextManager()
vdb1 = FAISSVectorDatabase(embedder=embedder)
rag_tech = RAGSystem(vector_db=vdb1, llm=llm, chunker=chunker, context_manager=context_manager)
rag_tech.add_text(text= "بايثون لغة برمجة عالية المستوى متعددة الاستخدامات، صُمِّمت لتكون سهلة القراءة والكتابة. "
"تدعم البرمجة الإجرائية والكائنية والوظيفية. تُستخدم على نطاق واسع في علم البيانات، "
"والذكاء الاصطناعي، وتطوير الويب، وأتمتة المهام. تتميز بمكتبة قياسية ضخمة ومجتمع "
"نشط يوفر آلاف الحزم الجاهزة عبر PyPI.",doc_id="tech_python", metadata={"source": "tech_docs", "category": "programming"})
vdb2 = FAISSVectorDatabase(embedder=embedder)
rag_wiki = RAGSystem(vector_db=vdb2, llm=llm, chunker=chunker, context_manager=context_manager)
rag_wiki.add_text(text="بدأت بحوث الذكاء الاصطناعي رسمياً في مؤتمر دارتموث عام 1956. "
"مرّ المجال بفترتين من شتاء الذكاء الاصطناعي بسبب توقف التمويل. "
"انتعش مجدداً مطلع الألفية الثالثة بفضل البيانات الضخمة والقدرة الحوسبية, "
"ثم شهد طفرة مع ظهور نماذج اللغة الكبيرة.", doc_id="wiki_ai_history", metadata={"source": "wikipedia", "category": "history"})
vdb3 = FAISSVectorDatabase(embedder=embedder)
rag_internal = RAGSystem(vector_db=vdb3, llm=llm, chunker=chunker, context_manager=context_manager)
rag_internal.add_text(text="شركتنا TechAI Solutions تأسست عام 2020 وتتخصص في حلول الذكاء الاصطناعي. "
"نخدم أكثر من 150 عميلاً في قطاعات المال والصحة والتجزئة. "
"يضم فريقنا 80 موظفاً منهم 45 مهندساً وباحثاً.", doc_id="internal_company_overview", metadata={"source": "internal", "category": "company_info"})
fed = FederatedRAG(
aggregation_method = AggregationMethod.WEIGHTED,
min_sources = 1,
cache_ttl = 120.0,
stream_race_timeout = 3.0,
)
fed.add_source("tech_docs", rag_tech, weight=2.0, timeout=15.0) \
.add_source("wikipedia", rag_wiki, weight=1.0, timeout=10.0) \
.add_source("internal", rag_internal, weight=1.5, timeout=8.0)
print(f"عدد المصادر: {len(fed)}")
print(repr(fed))
result = fed.query("ما هو لغه بايثون", top_k=5)
print(result)
community/rag/federated_rag.md