Streaming RAG — `streaming_rag` Module — Public API Reference
Table of Contents
- Module Overview
- StreamConfig
- StreamingRAG
- StreamEvent
- EventType Enumeration
- Protocols
- Generation Strategy Selection
- Event Stream Lifecycle
- Error Handling Reference
- Quick-Start Example
1. Module Overview
The streaming_rag package wraps any existing RAG backend and adds real-time token streaming with a rich event model. Instead of waiting for a full answer to be assembled before returning it, the system emits each word or token as it is produced, interleaved with structured metadata events (retrieval started, retrieval done, generation started, generation done, error).
Key capabilities:
- Three-strategy generation — native async LLM stream → sync LLM stream (thread-bridged) → simulated word-by-word fallback, selected automatically at runtime.
- Dual surface API —
stream()/stream_events()for synchronous callers;astream()/astream_events()for async callers. All share the same underlying async implementation. - Rich
StreamEventmodel — every event carries a type, payload, and latency timestamp, giving consumers fine-grained control. - Configurable timeouts — independent retrieval and generation deadlines with graceful
ERRORevents on timeout. - Callback hooks —
on_chunkandon_eventfire on every chunk and every event respectively, enabling side-effect patterns (logging, databases, WebSocket push) without modifying the consumer loop. - Runtime statistics — cumulative stream count, chunk count, error count, and average latency.
- Thread-safe sync bridge —
stream_events()spawns a private daemon thread with astdlib queue.Queue(blocking, no busy-wait) so the async generator can be consumed from synchronous code safely.
Publicly exported symbols:
from fennec_community.rag.types.streaming_rag import (
StreamingRAG,
StreamConfig,
SyncStreamableLLM,
AsyncStreamableLLM,
EventType,
)2. StreamConfig
from fennec_community.rag.types.streaming_rag import StreamConfigStreamConfig is a dataclass that centralises all tunable parameters for a StreamingRAG instance. All fields are optional — the defaults are production-ready for most use cases.
Constructor
StreamConfig(
chunk_size: int = 10,
sim_delay: float = 0.04,
max_context_docs: int = 5,
max_doc_chars: int = 500,
emit_metadata: bool = True,
retrieval_timeout: float = 10.0,
generation_timeout: float = 60.0,
prompt_template: str = <built-in>,
)| Parameter | Type | Default | Description |
|---|---|---|---|
chunk_size |
int |
10 |
Number of words per simulated chunk when the fallback (Strategy 3) word-by-word simulator is used. Not applied to native streaming LLMs. |
sim_delay |
float |
0.04 |
Seconds of asyncio.sleep between simulated chunks. Controls the perceived streaming speed in fallback mode. |
max_context_docs |
int |
5 |
Maximum number of retrieved documents injected into the prompt context. Extra documents beyond this limit are silently discarded. |
max_doc_chars |
int |
500 |
Each retrieved document is truncated to this many characters before being inserted into the context block. |
emit_metadata |
bool |
True |
Reserved for future use — intended to gate whether non-CHUNK events are emitted. Currently all events are always emitted regardless of this flag. |
retrieval_timeout |
float |
10.0 |
Maximum seconds allowed for the entire retrieval step. Exceeding this emits an ERROR event and terminates the stream. |
generation_timeout |
float |
60.0 |
Maximum wall-clock seconds allowed for the entire generation step. A sliding deadline is applied per token, not just at the start. |
prompt_template |
str |
Built-in | Python format string with {context} and {query} placeholders. The built-in template instructs the LLM to answer strictly from the provided context and to respond in the same language as the question. |
Built-in prompt template:
You are a precise RAG assistant.
Strict rules:
- Use ONLY the provided context.
- Do NOT guess.
- If missing info, say: 'no information to answer '.
- The answer MUST be in the SAME language as the question.
- NEVER switch language.
Context:
{context}
Question: {query}
Final Answer:Example:
from fennec_community.rag.types.streaming_rag import StreamConfig
config = StreamConfig(
chunk_size=5,
max_context_docs=8,
max_doc_chars=800,
retrieval_timeout=15.0,
generation_timeout=90.0,
prompt_template=(
"Context:\n{context}\n\n"
"Question: {query}\n\n"
"Answer concisely:"
),
)3. StreamingRAG
from fennec_community.rag.types.streaming_rag import StreamingRAGStreamingRAG is the main class of the package. It wraps any RAG backend and adds real-time streaming generation with a full event lifecycle.
3.1 Constructor
StreamingRAG(
rag_system: Any,
llm: Any = None,
config: Optional[StreamConfig] = None,
*,
chunk_size: Optional[int] = None,
on_chunk: Optional[Callable[[str], None]] = None,
on_event: Optional[Callable[[StreamEvent], None]] = None,
)Purpose: Instantiate a streaming RAG system by binding it to an existing RAG backend and optionally providing an LLM capable of streaming. The config object is shallow-copied at construction time so mutations to the original StreamConfig do not affect this instance.
| Parameter | Type | Required | Description |
|---|---|---|---|
rag_system |
Any |
Yes | The underlying RAG backend. Must expose .retrieve(query) -> Any (sync or async). Optionally exposes .generate(query, context) -> str | Dict for fallback (Strategy 3) generation. |
llm |
Any |
No | Language model instance. Checked against AsyncStreamableLLM (preferred) and SyncStreamableLLM protocols at runtime. Falls back to rag_system.llm if None. If no LLM is resolvable, Strategy 3 (word-by-word simulation) is used. |
config |
Optional[StreamConfig] |
No | Streaming configuration object. A StreamConfig() with all defaults is created if None. |
chunk_size |
Optional[int] |
No | Keyword-only shortcut to override config.chunk_size after copying. Useful for quick one-off overrides without creating a full StreamConfig. |
on_chunk |
Optional[Callable[[str], None]] |
No | Callback invoked synchronously on every CHUNK token before it is yielded. Exceptions in the callback are not caught — the caller is responsible for any error handling inside the callback. |
on_event |
Optional[Callable[[StreamEvent], None]] |
No | Callback invoked synchronously on every StreamEvent (including non-CHUNK events). Fires before the event is yielded to the consumer. |
Internal state initialised:
| Attribute | Type | Description |
|---|---|---|
_total_streams |
int |
Count of astream_events() calls. |
_total_chunks |
int |
Count of CHUNK events emitted across all streams. |
_total_errors |
int |
Count of ERROR events emitted. |
_total_latency |
float |
Cumulative elapsed seconds across all streams (used to compute avg_stream_latency_s). |
Example:
from fennec_community.rag.types.streaming_rag import StreamingRAG, StreamConfig
def save_chunk(chunk: str):
db.append_token(chunk)
config = StreamConfig(max_context_docs=6, retrieval_timeout=8.0)
rag = StreamingRAG(
rag_system=my_rag,
llm=my_llm,
config=config,
on_chunk=save_chunk,
)3.2 Synchronous Streaming API
stream()
rag.stream(
query: str,
context: Optional[Dict] = None,
) -> Iterator[str]Purpose: Synchronous generator that yields plain text chunks only. All metadata events (RETRIEVAL_START, RETRIEVAL_DONE, GENERATION_START, GENERATION_DONE, ERROR) are silently consumed internally — but on_chunk and on_event callbacks still fire for every event. This is the simplest consumer API for cases where only the answer text is needed.
Implementation: Internally calls stream_events() and filters for event.is_chunk.
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | Natural-language question. |
context |
Optional[Dict] |
No | Supplemental key-value context merged into the generation call (Strategy 3 only — forwarded to rag_system.generate(query, context)). |
Yields: str — individual text chunks in the order they are produced by the LLM or simulator.
Example:
print("Answer: ", end="")
for chunk in rag.stream("What is retrieval-augmented generation?"):
print(chunk, end="", flush=True)
print() # newline after streamstream_events()
rag.stream_events(
query: str,
context: Optional[Dict] = None,
) -> Iterator[StreamEvent]Purpose: Synchronous generator that yields all StreamEvent objects — including metadata events and CHUNK events — in the order they occur. This is the richest synchronous consumer API, giving access to retrieval metadata, timing, and error events.
Implementation: Spawns a private daemon thread that runs a new asyncio event loop. The async generator astream_events() is executed inside that loop. Events are passed to the calling thread via a threading.Thread-safe stdlib queue.Queue with blocking get() (no polling, no busy-wait). A None sentinel is enqueued after the last event to signal completion.
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | Natural-language question. |
context |
Optional[Dict] |
No | Supplemental context forwarded to generation (Strategy 3 only). |
Yields: StreamEvent — all events in lifecycle order (see §8 Event Stream Lifecycle).
Example:
for event in rag.stream_events("Explain neural networks"):
if event.is_chunk:
print(event.data, end="", flush=True)
elif event.type == EventType.RETRIEVAL_DONE:
print(f"\n[Retrieved {event.data['num_docs']} docs in {event.latency:.2f}s]")
elif event.type == EventType.ERROR:
print(f"\n[Error at {event.data['stage']}: {event.data['msg']}]")
break3.3 Non-Streaming Convenience
query()
rag.query(
query: str,
context: Optional[Dict] = None,
) -> Dict[str, Any]Purpose: Non-streaming convenience method that collects the full stream internally and returns a single dictionary with the complete answer string and the full event list. Useful when you want the benefits of the streaming pipeline (timeouts, callbacks, event logging) without implementing a generator consumer loop.
Implementation: Internally calls stream_events(), buffers all chunks, and concatenates them.
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | Natural-language question. |
context |
Optional[Dict] |
No | Supplemental context forwarded to generation. |
Returns: Dict[str, Any] with the following fields:
| Key | Type | Description |
|---|---|---|
answer |
str |
The complete concatenated answer text from all CHUNK events. Empty string if no chunks were emitted. |
events |
List[StreamEvent] |
Ordered list of every StreamEvent emitted during the stream (metadata + chunks). |
Example:
result = rag.query("What are the main benefits of RAG?")
print(result["answer"])
# Inspect the event trace
for ev in result["events"]:
if ev.type == EventType.GENERATION_DONE:
print(f"Total latency: {ev.data['total_latency']:.2f}s")3.4 Asynchronous Streaming API
astream()
async for chunk in rag.astream(
query: str,
context: Optional[Dict] = None,
):
print(chunk, end="", flush=True)Purpose: Async generator that yields plain text chunks only. The async counterpart of stream(). All metadata events are silently consumed — callbacks still fire. Preferred in async applications (FastAPI, Starlette, etc.) over the synchronous API to avoid blocking the event loop.
Implementation: Delegates to astream_events() and filters for event.is_chunk.
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | Natural-language question. |
context |
Optional[Dict] |
No | Supplemental context forwarded to generation. |
Yields: str — individual text chunks.
Example:
import asyncio
async def main():
print("Answer: ", end="")
async for chunk in rag.astream("How does vector similarity search work?"):
print(chunk, end="", flush=True)
print()
asyncio.run(main())astream_events()
async for event in rag.astream_events(
query: str,
context: Optional[Dict] = None,
) -> AsyncIterator[StreamEvent]:Purpose: The core implementation of the entire streaming system. All other methods (stream, stream_events, astream, query) delegate to this async generator. Yields all StreamEvent objects in lifecycle order with monotonic latency timestamps.
Full pipeline executed:
- Emit
RETRIEVAL_STARTevent. - Await
_retrieve(query)withretrieval_timeout. On timeout or exception → emitERRORevent and return. - Emit
RETRIEVAL_DONEevent withnum_docsanddocspayload. - Build the context string and format the prompt from
StreamConfig.prompt_template. - Emit
GENERATION_STARTevent withprompt_charspayload. - Iterate
_generate()with a slidinggeneration_timeoutdeadline (applied per token viaasyncio.wait_for(__anext__())). On timeout or exception → emitERRORevent and return. - For each token: fire
on_chunkcallback, emitCHUNKevent. - Always call
gen.aclose()infinallyto release LLM resources (e.g., open HTTP connections). - Emit
GENERATION_DONEevent withtotal_latencypayload.
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | Natural-language question. |
context |
Optional[Dict] |
No | Supplemental context forwarded to generation (Strategy 3 only). |
Yields: StreamEvent — all events in lifecycle order.
Example:
import asyncio
from fennec_community.rag.types.streaming_rag import EventType
async def rich_consumer():
async for event in rag.astream_events("Describe transformer architecture"):
match event.type:
case EventType.RETRIEVAL_START:
print(f"🔍 Retrieving... (t={event.latency:.3f}s)")
case EventType.RETRIEVAL_DONE:
print(f"📄 Got {event.data['num_docs']} docs (t={event.latency:.3f}s)")
case EventType.GENERATION_START:
print(f"✍️ Generating {event.data['prompt_chars']} char prompt...")
case EventType.CHUNK:
print(event.data, end="", flush=True)
case EventType.GENERATION_DONE:
print(f"\n✅ Done in {event.data['total_latency']:.2f}s")
case EventType.ERROR:
print(f"\n❌ Error at [{event.data['stage']}]: {event.data['msg']}")
asyncio.run(rich_consumer())3.5 Runtime Statistics
stats (property)
rag.stats -> Dict[str, Any]Purpose: Return a snapshot of all cumulative streaming statistics since the instance was created. Resets only when the object is garbage-collected.
Parameters: None — accessed as a property (rag.stats, not rag.stats()).
Returns: Dict[str, Any] with the following fields:
| Key | Type | Description |
|---|---|---|
total_streams |
int |
Total number of astream_events() calls initiated (including those that errored). |
total_chunks |
int |
Total CHUNK events emitted across all streams. Directly proportional to the volume of generated text. |
total_errors |
int |
Total ERROR events emitted (retrieval timeouts + generation timeouts + exceptions). |
avg_stream_latency_s |
float |
Mean elapsed seconds per stream, rounded to 3 decimal places. Only streams that completed (reached GENERATION_DONE) contribute to this average. |
Example:
import json
# After several queries:
print(json.dumps(rag.stats, indent=2))
# {
# "total_streams": 12,
# "total_chunks": 847,
# "total_errors": 1,
# "avg_stream_latency_s": 2.341
# }3.6 Representation
__repr__()
repr(rag)Purpose: Return a concise representation of the instance's LLM capability and chunk size. Useful for logging and REPL inspection.
Returns: str in one of three forms:
| LLM capability | Output |
|---|---|
Async LLM (AsyncStreamableLLM) |
StreamingRAG(llm=async, chunk_size=10) |
Sync LLM (SyncStreamableLLM) |
StreamingRAG(llm=sync, chunk_size=10) |
| No LLM (simulation fallback) | StreamingRAG(llm=sim, chunk_size=10) |
Example:
print(repr(rag))
# StreamingRAG(llm=async, chunk_size=10)4. StreamEvent
from fennec_community.rag.types.streaming_rag import StreamEventStreamEvent is a frozen dataclass representing a single event emitted during a stream. It is the universal carrier for both text chunks and lifecycle metadata.
4.1 Attributes
| Attribute | Type | Description |
|---|---|---|
type |
EventType |
The event category. Determines how data should be interpreted. |
data |
Any |
Event payload. A plain str for CHUNK events; a dict for all metadata events. |
latency |
float |
Seconds elapsed since the stream started (monotonic clock), rounded to 4 decimal places. Always 0.0 for the very first event. |
Frozen:
StreamEventinstances are immutable (frozen=True). All fields must be set at construction time.
4.2 Properties
is_chunk
event.is_chunk -> boolPurpose: Convenience predicate — True only when event.type is EventType.CHUNK. Allows consumers to distinguish text payload events from metadata events without importing EventType.
Returns: bool
Example:
for event in rag.stream_events("What is RAG?"):
if event.is_chunk:
print(event.data, end="")4.3 Special Methods
__str__()
str(event) # → strPurpose: Return event.data when the event is a CHUNK, or "" (empty string) for all other event types. Enables a very concise consumer pattern where StreamEvent objects can be concatenated directly as strings.
Returns: str
Example:
# Ultra-compact consumer — works because str(non-chunk event) == ""
answer = "".join(str(ev) for ev in rag.stream_events("Summarise the context"))
print(answer)5. EventType Enumeration
from fennec_community.rag.types.streaming_rag import EventTypeEventType is a str-based Enum representing all possible event categories emitted during a stream. Because it inherits from str, values can be compared directly to string literals.
| Enum Value | String Value | data payload |
Description |
|---|---|---|---|
EventType.RETRIEVAL_START |
"retrieval_start" |
{"query": str} |
Emitted immediately before calling the retriever. Marks the start of the retrieval step. |
EventType.RETRIEVAL_DONE |
"retrieval_done" |
{"num_docs": int, "docs": List[Dict]} |
Emitted after retrieval completes successfully. num_docs is the raw count before max_context_docs truncation. |
EventType.GENERATION_START |
"generation_start" |
{"prompt_chars": int} |
Emitted immediately before calling the LLM. prompt_chars is the total character length of the formatted prompt. |
EventType.CHUNK |
"chunk" |
str |
A text token or word chunk produced by the LLM. For native streaming this is one token; for the word-level simulator this is chunk_size words. |
EventType.GENERATION_DONE |
"generation_done" |
{"total_latency": float} |
Emitted after the last chunk. total_latency is the full elapsed time since stream start in seconds. |
EventType.ERROR |
"error" |
{"msg": str, "stage": str} |
Emitted on retrieval timeout, generation timeout, or any unhandled exception. stage is either "retrieval" or "generation". After an ERROR the generator returns — no further events are emitted. |
Usage with match statement (Python 3.10+):
async for event in rag.astream_events(query):
match event.type:
case EventType.RETRIEVAL_DONE:
print(f"Found {event.data['num_docs']} documents")
case EventType.CHUNK:
print(event.data, end="")
case EventType.ERROR:
print(f"Error: {event.data['msg']}")6. Protocols
from fennec_community.rag.types.streaming_rag import SyncStreamableLLM, AsyncStreamableLLMBoth protocols are decorated with @runtime_checkable, which means isinstance(obj, SyncStreamableLLM) works at runtime. StreamingRAG uses this to select the correct generation strategy automatically.
6.1 SyncStreamableLLM
class SyncStreamableLLM(Protocol):
def stream(self, prompt: str) -> Iterator[str]: ...Purpose: Protocol that any synchronous streaming LLM must satisfy. Implement this protocol to use Strategy 2 (sync stream wrapped in a thread executor).
Required method:
| Method | Signature | Description |
|---|---|---|
stream |
(prompt: str) -> Iterator[str] |
Accept a complete prompt string and yield text tokens one at a time as they are produced. |
Implementation example:
from fennec_community.rag.types.streaming_rag import SyncStreamableLLM
from typing import Iterator
class MyOpenAILLM: # implicitly satisfies SyncStreamableLLM
def stream(self, prompt: str) -> Iterator[str]:
for chunk in openai_client.chat.completions.create(
model="gpt-4o", messages=[{"role":"user","content":prompt}], stream=True
):
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
assert isinstance(MyOpenAILLM(), SyncStreamableLLM) # → True6.2 AsyncStreamableLLM
class AsyncStreamableLLM(Protocol):
async def astream(self, prompt: str) -> AsyncIterator[str]: ...Purpose: Protocol that any async streaming LLM must satisfy. Implement this protocol to use Strategy 1 (native async stream) — the highest-fidelity and lowest-latency option.
Required method:
| Method | Signature | Description |
|---|---|---|
astream |
async (prompt: str) -> AsyncIterator[str] |
Accept a complete prompt and asynchronously yield tokens as they are produced. |
Implementation example:
from fennec_community.rag.types.streaming_rag import AsyncStreamableLLM
from typing import AsyncIterator
class MyAsyncLLM: # implicitly satisfies AsyncStreamableLLM
async def astream(self, prompt: str) -> AsyncIterator[str]:
async for chunk in await async_openai_client.chat.completions.create(
model="gpt-4o", messages=[{"role":"user","content":prompt}], stream=True
):
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
assert isinstance(MyAsyncLLM(), AsyncStreamableLLM) # → True7. Generation Strategy Selection
StreamingRAG automatically selects the best available generation strategy at runtime, in priority order:
┌─────────────────────────────────────────────────────────────────┐
│ Generation Strategy Decision Tree │
├─────────────────────────────────────────────────────────────────┤
│ │
│ llm is not None │
│ AND isinstance(llm, AsyncStreamableLLM)? │
│ │ │
│ ├── YES → Strategy 1: Native async stream │
│ │ Calls llm.astream(prompt) │
│ │ ✅ Best latency, true token-level streaming │
│ │ │
│ └── NO ──→ isinstance(llm, SyncStreamableLLM)? │
│ │ │
│ ├── YES → Strategy 2: Sync stream │
│ │ Runs llm.stream(prompt) │
│ │ in a thread via executor │
│ │ Bridges sync → async safely │
│ │ ✅ Token-level, thread-safe │
│ │ │
│ └── NO ──→ Strategy 3: Simulation │
│ Calls rag_system.generate() │
│ Splits answer into word │
│ chunks of size=chunk_size │
│ with sim_delay between each │
│ ⚠️ Pseudo-streaming only │
└─────────────────────────────────────────────────────────────────┘| Strategy | Requires | LLM method used | Token-level? |
|---|---|---|---|
| 1 — Native async | llm implements AsyncStreamableLLM |
llm.astream(prompt) |
✅ Yes |
| 2 — Sync wrapped | llm implements SyncStreamableLLM |
llm.stream(prompt) in thread |
✅ Yes |
| 3 — Simulation | rag_system.generate() exists |
rag_system.generate(query, context) |
⚠️ Word-level only |
Strategy 3 error: If
rag_system.generateis also absent,AttributeErroris raised with a descriptive message instructing the caller to provide a proper LLM.
8. Event Stream Lifecycle
A complete, error-free stream always produces events in this exact order:
Time →
│
├─ [t=0.0000] RETRIEVAL_START {"query": "..."}
│
│ [retrieval executes — up to retrieval_timeout seconds]
│
├─ [t=0.Xsec] RETRIEVAL_DONE {"num_docs": N, "docs": [...]}
│
│ [context built, prompt formatted]
│
├─ [t=0.Xsec] GENERATION_START {"prompt_chars": N}
│
│ [LLM streaming begins]
│
├─ [t=0.Xsec] CHUNK "First few words "
├─ [t=0.Xsec] CHUNK "of the answer "
├─ [t=0.Xsec] CHUNK "..."
│ ...
├─ [t=0.Xsec] CHUNK "last chunk."
│
└─ [t=0.Xsec] GENERATION_DONE {"total_latency": X.XXXX}
─── On retrieval error or timeout: ───────────────────────────────
│
├─ RETRIEVAL_START
└─ ERROR {"msg": "Retrieval timed out", "stage": "retrieval"}
[stream ends — no further events]
─── On generation error or timeout: ──────────────────────────────
│
├─ RETRIEVAL_START
├─ RETRIEVAL_DONE
├─ GENERATION_START
├─ CHUNK (zero or more chunks may have been emitted before error)
└─ ERROR {"msg": "Generation timed out", "stage": "generation"}
[stream ends — no further events]9. Error Handling Reference
| Scenario | ERROR.data["stage"] |
ERROR.data["msg"] |
Recovery |
|---|---|---|---|
Retrieval timed out (exceeded retrieval_timeout) |
"retrieval" |
"Retrieval timed out" |
Stream ends. Retry with shorter query or increase retrieval_timeout. |
| Retrieval raised an exception | "retrieval" |
Exception message | Stream ends. Check rag_system.retrieve() implementation. |
| Generation timed out (sliding deadline exceeded) | "generation" |
"Generation timed out" |
Stream ends. Increase generation_timeout or reduce max_context_docs. |
| Generation raised an exception | "generation" |
Exception message | Stream ends. Check LLM connectivity and stream() / astream() implementation. |
Strategy 3: rag_system.generate missing |
Raised before stream starts | AttributeError with descriptive message |
Provide an LLM implementing SyncStreamableLLM or AsyncStreamableLLM. |
10. Quick-Start Example
import asyncio
import json
from fennec_community.rag.types.streaming_rag import StreamingRAG, StreamConfig, EventType
from fennec_community.rag.types.streaming_rag import SyncStreamableLLM, AsyncStreamableLLM
# ═══════════════════════════════════════════════════════════════════
# 1. Implement the AsyncStreamableLLM protocol (Strategy 1)
# ═══════════════════════════════════════════════════════════════════
class MyAsyncLLM:
async def astream(self, prompt: str):
# Replace with a real async LLM call (OpenAI, Anthropic, etc.)
words = f"Answer based on context: {prompt[:30]}...".split()
for word in words:
yield word + " "
await asyncio.sleep(0.02)
# ═══════════════════════════════════════════════════════════════════
# 2. Configure the streamer
# ═══════════════════════════════════════════════════════════════════
config = StreamConfig(
max_context_docs=5,
max_doc_chars=600,
retrieval_timeout=10.0,
generation_timeout=60.0,
)
# ═══════════════════════════════════════════════════════════════════
# 3. Callbacks (optional)
# ═══════════════════════════════════════════════════════════════════
log = []
def on_chunk(chunk: str):
log.append(chunk) # side-effect: accumulate tokens elsewhere
def on_event(event):
if event.type == EventType.ERROR:
print(f"[CALLBACK] ❌ Error: {event.data}")
# ═══════════════════════════════════════════════════════════════════
# 4. Instantiate
# ═══════════════════════════════════════════════════════════════════
rag = StreamingRAG(
rag_system=my_rag_system,
llm=MyAsyncLLM(),
config=config,
on_chunk=on_chunk,
on_event=on_event,
)
print(repr(rag))
# StreamingRAG(llm=async, chunk_size=10)
# ═══════════════════════════════════════════════════════════════════
# 5. Synchronous plain text streaming
# ═══════════════════════════════════════════════════════════════════
print("Answer: ", end="")
for chunk in rag.stream("What is retrieval-augmented generation?"):
print(chunk, end="", flush=True)
print()
# ═══════════════════════════════════════════════════════════════════
# 6. Synchronous rich event streaming
# ═══════════════════════════════════════════════════════════════════
for event in rag.stream_events("Explain transformer attention"):
if event.type == EventType.RETRIEVAL_DONE:
print(f"\n[Docs: {event.data['num_docs']} Latency: {event.latency:.3f}s]")
elif event.type == EventType.CHUNK:
print(event.data, end="", flush=True)
elif event.type == EventType.GENERATION_DONE:
print(f"\n[Done in {event.data['total_latency']:.2f}s]")
elif event.type == EventType.ERROR:
print(f"\n[Error at {event.data['stage']}: {event.data['msg']}]")
break
# ═══════════════════════════════════════════════════════════════════
# 7. Non-streaming (collect full answer)
# ═══════════════════════════════════════════════════════════════════
result = rag.query("Summarise the key points about BERT")
print(result["answer"])
print(f"Events captured: {len(result['events'])}")
# ═══════════════════════════════════════════════════════════════════
# 8. Async plain text streaming
# ═══════════════════════════════════════════════════════════════════
async def async_text():
print("Async: ", end="")
async for chunk in rag.astream("How does vector search work?"):
print(chunk, end="", flush=True)
print()
asyncio.run(async_text())
# ═══════════════════════════════════════════════════════════════════
# 9. Async rich event streaming
# ═══════════════════════════════════════════════════════════════════
async def async_events():
async for event in rag.astream_events("Describe the RAG pipeline"):
match event.type:
case EventType.RETRIEVAL_START:
print(f"🔍 Retrieving (t={event.latency:.3f}s)")
case EventType.RETRIEVAL_DONE:
print(f"📄 Retrieved {event.data['num_docs']} docs")
case EventType.GENERATION_START:
print(f"✍️ Generating ({event.data['prompt_chars']} chars)")
case EventType.CHUNK:
print(event.data, end="", flush=True)
case EventType.GENERATION_DONE:
print(f"\n✅ Total: {event.data['total_latency']:.2f}s")
case EventType.ERROR:
print(f"\n❌ [{event.data['stage']}] {event.data['msg']}")
asyncio.run(async_events())
# ═══════════════════════════════════════════════════════════════════
# 10. Runtime statistics
# ═══════════════════════════════════════════════════════════════════
print(json.dumps(rag.stats, indent=2))
# {
# "total_streams": 4,
# "total_chunks": 312,
# "total_errors": 0,
# "avg_stream_latency_s": 1.847
# }
# ═══════════════════════════════════════════════════════════════════
# 11. Protocol compliance check
# ═══════════════════════════════════════════════════════════════════
from fennec_community.rag.types.streaming_rag import AsyncStreamableLLM, SyncStreamableLLM
print(isinstance(MyAsyncLLM(), AsyncStreamableLLM)) # → True
class MySyncLLM:
def stream(self, prompt: str):
for word in "Hello world from sync LLM".split():
yield word + " "
print(isinstance(MySyncLLM(), SyncStreamableLLM)) # → True
# Switch to sync LLM
rag2 = StreamingRAG(
rag_system=my_rag_system,
llm=MySyncLLM(),
chunk_size=5, # override via constructor shortcut
)
print(repr(rag2))
# StreamingRAG(llm=sync, chunk_size=5)
# ═══════════════════════════════════════════════════════════════════
# 12. Fallback (no LLM — word-level simulation)
# ═══════════════════════════════════════════════════════════════════
rag3 = StreamingRAG(
rag_system=my_rag_system, # must expose .generate()
# no llm provided — Strategy 3 (simulation) activated
config=StreamConfig(chunk_size=3, sim_delay=0.05),
)
print(repr(rag3))
# StreamingRAG(llm=sim, chunk_size=3)
for chunk in rag3.stream("Quick question for simulation mode"):
print(chunk, end="", flush=True)
print()Simple Real Example
from fennec_community.llm import GeminiInterface
from fennec_community.document_loaders import TextLoader
from fennec_community.vector_database import FAISSVectorDatabase
from fennec_community.chunks import ArabicTextChunker
from fennec_community.context import ContextManager
from fennec_community.embeddings import OllamaEmbedder
from fennec_community.rag.core import RAGSystem
from fennec_community.rag.types.streaming_rag import StreamingRAG, StreamConfig, EventType
loader_1 = TextLoader("./data_kn/faq.txt").load()
chunker = ArabicTextChunker(chunk_size=100, overlap=20)
embedder = OllamaEmbedder()
vector_db = FAISSVectorDatabase(embedder=embedder)
llm = GeminiInterface(api_key=llm_api)
context_manager = ContextManager()
rag_system = RAGSystem(llm=llm, vector_db=vector_db,chunker=chunker, context_manager=context_manager)
rag_system.add_documents(loader_1)
config = StreamConfig(
chunk_size=3,
sim_delay=0.05,
retrieval_timeout=15.0,
generation_timeout=90.0,
max_context_docs=5,
max_doc_chars=800,
)
def on_chunk_side_effect(token: str):
# مثال: إرسال عبر WebSocket
# websocket.send(token)
pass # ← لا طباعة هنا
def on_event_side_effect(event):
if event.type == EventType.ERROR:
print("خطأ في %s: %s", event.data["stage"], event.data["msg"])
rag = StreamingRAG(
rag_system=rag_system,
llm=llm,
config=config,
on_chunk=on_chunk_side_effect, # جانب خفي فقط
on_event=on_event_side_effect,
)
for token in rag.stream("ماهي طرق الدفع المتاحة؟"):
print(token, end="", flush=True) # ← مكان الطباعة الوحيد
print()
community/rag/streaming_rag.md