Graph-RAG Moduler `graph_rag` — Enterprise API Reference
Table of Contents
- Overview
- Architecture
- Quick Start
- Class: ConfigGraphRAG
- Class: GraphNode
- Class: GraphEdge
- Class: KnowledgeGraph
- Class: GraphRAG
- Return Value Reference
- Graph Context Expansion — Score Decay Model
- Environment Variables Reference
- Saved Files Layout
- Complete Examples
Overview
graph_rag is a production-grade Graph-augmented Retrieval system that combines a FAISS-powered vector database with a in-memory knowledge graph. Instead of treating documents as isolated chunks, graph_rag models entities and their relationships explicitly. At query time, an initial vector similarity search retrieves seed nodes, and a BFS graph traversal expands context through related entities — delivering richer, more connected answers than flat vector search alone.
Key capabilities at a glance:
| Capability | Detail |
|---|---|
| Graph-augmented retrieval | Seed nodes found by vector similarity are expanded through the knowledge graph via BFS |
| Semantic search | Pure vector similarity search with optional graph metadata enrichment |
| Multilingual LLM prompting | Built-in prompt templates for Arabic (ar) and English (en) |
| Async API | All major operations have async counterparts (aquery, aretrieve, agenerate) |
| Persistence | Full state save/load to disk (graph JSON + vector DB + mappings) |
| Graph analytics | Degree computation, connected components, shortest path, subgraph extraction |
| Environment-driven config | All key parameters can be set via environment variables |
| Score decay expansion | Neighbour scores decay exponentially with depth (factor 0.5^depth) |
Architecture
┌────────────────────────────────────────────────────────────────┐
│ GraphRAG │
│ │
│ ┌─────────────────────┐ ┌──────────────────────────────┐ │
│ │ VectorDatabase │ │ KnowledgeGraph │ │
│ │ (FAISS + embedder) │ │ nodes · edges · adjacency │ │
│ └────────┬────────────┘ └──────────────┬───────────────┘ │
│ │ vector similarity search │ BFS expansion │
│ │ │ │
│ └─────────────────┬────────────────┘ │
│ │ │
│ _expand_graph_context │
│ │ │
│ _build_results → LLM prompt → answer │
└────────────────────────────────────────────────────────────────┘Data flow:
add_document_with_relations()
│
├── GraphNode per entity → KnowledgeGraph.add_node()
├── DocumentChunk per entity → VectorDatabase.add()
└── GraphEdge per relation → KnowledgeGraph.add_edge()
retrieve_with_context(query)
│
├── VectorDatabase.search() → seed (chunk, score) pairs
├── _chunk_to_node mapping → seed node scores
├── _expand_graph_context() → BFS neighbour expansion
└── _build_results() → ranked result dicts
query(query)
│
├── retrieve_with_context()
├── _build_graph_context() → rich LLM context string
├── _build_prompt() → language-aware prompt
└── llm.generate(prompt) → final answer stringQuick Start
from fennec_community.rag.types.graph_rag import GraphRAG, GraphNode, GraphEdge, KnowledgeGraph, ConfigGraphRAG
# 1. Configure
config = ConfigGraphRAG(k=5, context_depth=2, embedder_name="all-MiniLM-L6-v2")
# 2. Initialise (requires a VectorDatabase instance)
rag = GraphRAG(vector_db=my_vector_db, llm=my_llm, config=config)
# 3. Ingest a document
rag.add_document_with_relations(
content="Paris is the capital of France. The Eiffel Tower is in Paris.",
entities=[
{"id": "Paris", "type": "city", "text": "Paris"},
{"id": "France", "type": "country", "text": "France"},
{"id": "EiffelTower", "type": "landmark", "text": "Eiffel Tower"},
],
relations=[
{"source": "Paris", "target": "France", "type": "capital_of"},
{"source": "EiffelTower", "target": "Paris", "type": "located_in"},
],
doc_id="doc_001",
)
# 4. Query
answer = rag.query("Where is the Eiffel Tower?", language="en")
print(answer)
# 5. Async usage
answer = await rag.aquery("Where is the Eiffel Tower?", language="en")Class: ConfigGraphRAG
from fennec_community.rag.types.graph_rag import ConfigGraphRAGA @dataclass that centralises all tunable parameters for the GraphRAG system. Validated automatically at construction time.
__init__ — All Configuration Fields
ConfigGraphRAG(
# ── Cache
l1_size: int = 50,
l2_size: int = 50,
l3_size: int = 50,
cache_ttl: int = 300,
# ── Retrieval
k: int = 5,
context_depth: int = 2,
max_depth: int = 2,
# ── Embedding
embedder_name: str = "paraphrase-multilingual-MiniLM-L12-v2",
embedding_dim: Optional[int] = None,
batch_size: int = 32,
normalize_embeddings: bool = True,
# ── FAISS
use_gpu: bool = False,
faiss_nlist: Optional[int] = None,
faiss_nprobe: int = 10,
rebuild_threshold: int = 100,
# ── Search
enable_hybrid_search: bool = False,
hybrid_alpha: float = 0.5,
# ── Performance
max_cache_embeddings: int = 1000,
parallel_processing: bool = True,
# ── Logging
log_level: str = "INFO",
)Purpose: Creates and validates the complete configuration object. Raises ValueError immediately on invalid values so misconfiguration is caught at startup, not at query time.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
l1_size |
int |
50 |
Level-1 cache capacity (fastest layer). |
l2_size |
int |
50 |
Level-2 cache capacity (mid layer). |
l3_size |
int |
50 |
Level-3 cache capacity (largest layer). |
cache_ttl |
int |
300 |
Cache entry time-to-live in seconds. |
k |
int |
5 |
Default number of top results returned by retrieval. Must be ≥ 1. |
context_depth |
int |
2 |
Default BFS expansion depth from each seed node. 0 disables expansion. Must be ≥ 0. |
max_depth |
int |
2 |
Maximum BFS depth used by KnowledgeGraph.get_neighbors. Must be ≥ 0. |
embedder_name |
str |
"paraphrase-multilingual-MiniLM-L12-v2" |
Sentence-transformer model name. Passed to the vector database's embedder. |
embedding_dim |
Optional[int] |
None |
Override embedding dimensionality. Auto-detected from model when None. |
batch_size |
int |
32 |
Number of texts embedded per batch. Must be ≥ 1. |
normalize_embeddings |
bool |
True |
L2-normalise embeddings before indexing (required for cosine similarity). |
use_gpu |
bool |
False |
Move FAISS index to GPU via faiss.index_cpu_to_gpu. |
faiss_nlist |
Optional[int] |
None |
IVF cluster count. Auto-calculated from corpus size when None. |
faiss_nprobe |
int |
10 |
Number of IVF cells probed at query time. Higher = better recall, slower. |
rebuild_threshold |
int |
100 |
Rebuild FAISS index after this many additions. |
enable_hybrid_search |
bool |
False |
Enable hybrid semantic + keyword search. |
hybrid_alpha |
float |
0.5 |
Blend ratio: 1.0 = pure semantic, 0.0 = pure keyword. Must be in [0, 1]. |
max_cache_embeddings |
int |
1000 |
Maximum number of embedding vectors to cache in memory. |
parallel_processing |
bool |
True |
Enable parallel processing where available. |
log_level |
str |
"INFO" |
Python logging level: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". |
Raises: ValueError for any of:
k < 1context_depth < 0max_depth < 0hybrid_alphaoutside[0, 1]batch_size < 1
Example:
config = ConfigGraphRAG(
k=10,
context_depth=3,
embedder_name="all-MiniLM-L6-v2",
use_gpu=True,
enable_hybrid_search=True,
hybrid_alpha=0.7,
log_level="DEBUG",
)from_env
@classmethod
ConfigGraphRAG.from_env() -> ConfigGraphRAGPurpose: Factory that reads configuration from environment variables. Ideal for containerised deployments where configuration is injected via the environment rather than hardcoded.
Parameters: None (reads from os.environ).
Returns: ConfigGraphRAG — a fully validated instance built from environment values, falling back to defaults for any variable not set.
Supported environment variables:
| Variable | Maps to | Default |
|---|---|---|
GRAPHRAG_K |
k |
5 |
GRAPHRAG_CONTEXT_DEPTH |
context_depth |
2 |
GRAPHRAG_MAX_DEPTH |
max_depth |
2 |
GRAPHRAG_EMBEDDER |
embedder_name |
"all-MiniLM-L6-v2" |
GRAPHRAG_BATCH_SIZE |
batch_size |
32 |
GRAPHRAG_USE_GPU |
use_gpu |
false |
GRAPHRAG_LOG_LEVEL |
log_level |
"INFO" |
GRAPHRAG_HYBRID_SEARCH |
enable_hybrid_search |
false |
Example:
export GRAPHRAG_K=10
export GRAPHRAG_EMBEDDER=all-mpnet-base-v2
export GRAPHRAG_USE_GPU=trueconfig = ConfigGraphRAG.from_env()
rag = GraphRAG(vector_db=vdb, config=config)to_dict
config.to_dict() -> dictPurpose: Serialises the core configuration parameters to a plain Python dictionary. Useful for logging the active configuration, storing it alongside saved model artefacts, or debugging.
Parameters: None.
Returns: dict with the following keys: l1_size, l2_size, l3_size, k, context_depth, max_depth, embedder_name, batch_size, use_gpu.
Example:
import json
config = ConfigGraphRAG(k=8)
print(json.dumps(config.to_dict(), indent=2))
# {
# "l1_size": 50,
# "k": 8,
# "embedder_name": "paraphrase-multilingual-MiniLM-L12-v2",
# ...
# }Class: GraphNode
from fennec_community.rag.types.graph_rag import GraphNodeA @dataclass representing a single entity node in the knowledge graph. Every node has a unique ID, textual content, a type label, and optional metadata and embedding.
__init__ — Node Constructor
GraphNode(
id: str,
content: str,
node_type: str,
metadata: Dict[str, Any] = field(default_factory=dict),
embedding: Optional[List[float]] = None,
)Purpose: Creates a validated graph node. Raises ValueError on empty id, content, or node_type.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
id |
str |
— | Unique node identifier within the graph (e.g., "Paris", "entity_42"). Must be non-empty. |
content |
str |
— | Human-readable text content of the entity (e.g., "Paris is the capital of France"). Must be non-empty. |
node_type |
str |
— | Semantic category label (e.g., "city", "person", "concept"). Must be non-empty. |
metadata |
Dict[str, Any] |
{} |
Arbitrary key-value pairs attached to the node (e.g., {"doc_id": "doc_001", "source": "wikipedia"}). |
embedding |
Optional[List[float]] |
None |
Pre-computed vector embedding. Usually managed by the system; set manually only when importing from external sources. |
Raises: ValueError if id, content, or node_type is empty.
Example:
node = GraphNode(
id="Paris",
content="Paris is the capital and most populous city of France.",
node_type="city",
metadata={"country": "France", "population": 2_161_000},
)has_embedding (property)
node.has_embedding -> boolPurpose: Quickly checks whether this node has a valid, non-empty embedding vector stored. Used internally before attempting embedding-based operations.
Returns: True if self.embedding is not None and has at least one element; False otherwise.
Example:
if not node.has_embedding:
print(f"Node '{node.id}' needs to be embedded before indexing.")get_content_hash
node.get_content_hash() -> strPurpose: Generates an MD5 fingerprint of the node's content field. Use this to detect whether a node's content has changed since it was last indexed — avoiding unnecessary re-embedding.
Parameters: None.
Returns: str — 32-character hexadecimal MD5 digest of content encoded as UTF-8.
Example:
old_hash = node.get_content_hash()
node.content = "Paris is the capital of France and the largest city."
new_hash = node.get_content_hash()
if old_hash != new_hash:
print("Content changed — re-embed and re-index this node.")to_dict
node.to_dict(include_embedding: bool = False) -> Dict[str, Any]Purpose: Serialises the node to a plain dictionary. Used for JSON persistence, API responses, and debugging.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
include_embedding |
bool |
False |
When True, the embedding list is included in the output if present. Omit for lightweight serialisation. |
Returns: Dict[str, Any] with keys id, content, node_type, metadata, and optionally embedding.
Example:
# Lightweight (no embedding)
data = node.to_dict()
# {"id": "Paris", "content": "...", "node_type": "city", "metadata": {...}}
# Full (with embedding for export)
data = node.to_dict(include_embedding=True)from_dict
@classmethod
GraphNode.from_dict(data: Dict[str, Any]) -> GraphNodePurpose: Deserialises a GraphNode from a plain dictionary — the inverse of to_dict. Use when loading nodes from JSON files or external APIs.
Parameters:
| Parameter | Type | Description |
|---|---|---|
data |
Dict[str, Any] |
Dictionary with at least id, content, and node_type keys. metadata and embedding are optional. |
Returns: GraphNode instance.
Raises: KeyError if id, content, or node_type is missing from data.
Example:
import json
with open("nodes.json") as f:
raw_nodes = json.load(f)
nodes = [GraphNode.from_dict(d) for d in raw_nodes]update_metadata
node.update_metadata(key: str, value: Any) -> NonePurpose: Sets or overwrites a single metadata field on the node. Simpler than re-constructing the full metadata dict when only one field needs to change.
Parameters:
| Parameter | Type | Description |
|---|---|---|
key |
str |
The metadata key to set or overwrite. |
value |
Any |
The new value for that key. |
Returns: None
Example:
node.update_metadata("verified", True)
node.update_metadata("last_updated", "2026-05-10")merge_metadata
node.merge_metadata(metadata: Dict[str, Any]) -> NonePurpose: Merges a dictionary of new metadata into the node's existing metadata using dict.update. Existing keys are overwritten; new keys are added. More efficient than calling update_metadata in a loop.
Parameters:
| Parameter | Type | Description |
|---|---|---|
metadata |
Dict[str, Any] |
Key-value pairs to merge into the node's existing metadata. |
Returns: None
Example:
node.merge_metadata({
"source": "Wikipedia",
"confidence": 0.95,
"language": "en",
})Class: GraphEdge
from fennec_community.rag.types.graph_rag import GraphEdgeA @dataclass representing a directed relationship between two nodes in the knowledge graph. An edge connects a source node to a target node via a named relation.
__init__ — Edge Constructor
GraphEdge(
source: str,
target: str,
relation: str,
weight: float = 1.0,
metadata: Dict[str, Any] = field(default_factory=dict),
bidirectional: bool = False,
)Purpose: Creates a validated directed edge. Rejects empty endpoints, negative weights, and self-loops.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
source |
str |
— | ID of the source (originating) node. Must match a node ID in the graph. Must be non-empty. |
target |
str |
— | ID of the target (destination) node. Must match a node ID in the graph. Must be non-empty. |
relation |
str |
— | Semantic label for the relationship (e.g., "capital_of", "located_in", "authored_by"). Must be non-empty. |
weight |
float |
1.0 |
Numeric strength of the relationship. Must be ≥ 0. Used in ranking and score propagation. |
metadata |
Dict[str, Any] |
{} |
Arbitrary key-value annotations (e.g., {"confidence": 0.9, "source": "NLP extraction"}). |
bidirectional |
bool |
False |
When True, the edge is added in both directions in the adjacency structure, without needing a separate reverse edge. |
Raises: ValueError if source or target is empty, relation is empty, weight < 0, or source == target (self-loops are not allowed).
Example:
edge = GraphEdge(
source="EiffelTower",
target="Paris",
relation="located_in",
weight=1.0,
bidirectional=False,
)edge_id (property)
edge.edge_id -> strPurpose: Returns a deterministic string identifier for the edge, derived from its source, relation, and target. Used as a unique key in the edge lookup dictionary inside KnowledgeGraph.
Returns: str — formatted as "{source}-{relation}->{target}".
Example:
edge = GraphEdge("Paris", "France", "capital_of")
print(edge.edge_id) # "Paris-capital_of->France"reverse
edge.reverse() -> GraphEdgePurpose: Creates a new GraphEdge with the source and target swapped and the relation prefixed with "inverse_". Useful when you need to materialise the inverse direction of a relationship explicitly.
Parameters: None.
Returns: GraphEdge — a new edge instance with reversed direction. The original edge is not modified.
Example:
forward = GraphEdge("Paris", "France", "capital_of", weight=1.0)
backward = forward.reverse()
print(backward)
# GraphEdge(France -> Paris [inverse_capital_of] w=1.0)to_dict
edge.to_dict() -> Dict[str, Any]Purpose: Serialises the edge to a plain dictionary for JSON persistence or API responses.
Parameters: None.
Returns: Dict[str, Any] with keys: source, target, relation, weight, metadata, bidirectional.
Example:
data = edge.to_dict()
# {"source": "Paris", "target": "France", "relation": "capital_of",
# "weight": 1.0, "metadata": {}, "bidirectional": False}from_dict
@classmethod
GraphEdge.from_dict(data: Dict[str, Any]) -> GraphEdgePurpose: Deserialises a GraphEdge from a plain dictionary — the inverse of to_dict. Use when loading edges from JSON files or external data.
Parameters:
| Parameter | Type | Description |
|---|---|---|
data |
Dict[str, Any] |
Dictionary with at least source, target, and relation keys. weight, metadata, and bidirectional are optional with defaults. |
Returns: GraphEdge instance.
Example:
import json
with open("edges.json") as f:
raw_edges = json.load(f)
edges = [GraphEdge.from_dict(d) for d in raw_edges]update_weight
edge.update_weight(new_weight: float) -> NonePurpose: Updates the edge's weight in place. Validates that the new weight is non-negative before applying it.
Parameters:
| Parameter | Type | Description |
|---|---|---|
new_weight |
float |
New weight value. Must be ≥ 0. |
Returns: None
Raises: ValueError if new_weight < 0.
Example:
edge.update_weight(2.5) # Strengthen this relationship
edge.update_weight(0.1) # Weaken this relationshipClass: KnowledgeGraph
from fennec_community.rag.types.graph_rag import KnowledgeGraphAn efficient in-memory directed graph with O(1) node lookup, O(1) edge lookup, O(1) degree computation, and BFS-based traversal. Maintains adjacency, reverse adjacency, and in-degree structures for fast graph operations.
__init__
KnowledgeGraph(config: Optional[ConfigGraphRAG] = None)Purpose: Initialises an empty knowledge graph. Creates all internal adjacency and lookup structures. Can be used standalone or as a component of GraphRAG.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
config |
Optional[ConfigGraphRAG] |
None |
Configuration object. Defaults to ConfigGraphRAG() with all defaults when None. |
Returns: KnowledgeGraph instance.
Example:
kg = KnowledgeGraph()
# Or with custom config:
config = ConfigGraphRAG(max_depth=3)
kg = KnowledgeGraph(config=config)Node Management
add_node
kg.add_node(node: GraphNode) -> boolPurpose: Adds a new node to the graph, or updates the existing node if a node with the same id already exists (upsert semantics).
Parameters:
| Parameter | Type | Description |
|---|---|---|
node |
GraphNode |
The node to add or update. |
Returns: bool — True if the node was newly added; False if it replaced an existing node or if an internal error occurred.
Example:
node = GraphNode(id="Paris", content="Capital of France", node_type="city")
is_new = kg.add_node(node)
print("New node?" , is_new) # True
kg.add_node(node) # Returns False (update)remove_node
kg.remove_node(node_id: str) -> boolPurpose: Removes a node and all edges connected to it (both incoming and outgoing). Rebuilds adjacency structures after removal to maintain consistency.
Parameters:
| Parameter | Type | Description |
|---|---|---|
node_id |
str |
ID of the node to remove. |
Returns: bool — True if the node was found and removed; False if it did not exist or an error occurred.
Example:
removed = kg.remove_node("Paris")
# All edges sourced from or targeting "Paris" are also deletedget_node
kg.get_node(node_id: str) -> Optional[GraphNode]Purpose: Retrieves a node by its ID. The primary lookup method for reading node data.
Parameters:
| Parameter | Type | Description |
|---|---|---|
node_id |
str |
The unique node identifier to look up. |
Returns: GraphNode if found; None if not.
Example:
node = kg.get_node("Paris")
if node:
print(node.content)Edge Management
add_edge
kg.add_edge(edge: GraphEdge) -> boolPurpose: Adds a directed edge between two existing nodes. If an identical edge (source, relation, target triple) already exists, its weight is updated instead of adding a duplicate. Both outgoing and incoming adjacency structures are updated. For bidirectional edges, both directions are registered.
Parameters:
| Parameter | Type | Description |
|---|---|---|
edge |
GraphEdge |
The edge to add. Both edge.source and edge.target must already exist as nodes. |
Returns: bool — True on success; False if either endpoint node is missing or an error occurs.
Example:
edge = GraphEdge(source="EiffelTower", target="Paris", relation="located_in")
success = kg.add_edge(edge)
# Returns False if "EiffelTower" or "Paris" nodes don't exist yetget_edge
kg.get_edge(source: str, target: str, relation: str) -> Optional[GraphEdge]Purpose: Looks up a specific edge by its three-part identifier. O(1) lookup via the internal edge dictionary.
Parameters:
| Parameter | Type | Description |
|---|---|---|
source |
str |
Source node ID. |
target |
str |
Target node ID. |
relation |
str |
Relation type label. |
Returns: GraphEdge if found; None if no matching edge exists.
Example:
edge = kg.get_edge("EiffelTower", "Paris", "located_in")
if edge:
print(f"Weight: {edge.weight}")Graph Traversal
get_neighbors
kg.get_neighbors(
node_id: str,
max_depth: int = None,
include_incoming: bool = True,
) -> Set[str]Purpose: Returns all node IDs reachable from node_id within max_depth hops, using BFS. Optionally includes nodes that have edges pointing to node_id (reverse adjacency). The starting node itself is excluded from the result.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
node_id |
str |
— | The node to expand from. |
max_depth |
int |
config.max_depth |
Maximum number of hops to traverse. 1 returns only direct neighbours. |
include_incoming |
bool |
True |
When True, nodes with edges pointing to node_id are also included in the traversal. |
Returns: Set[str] — set of neighbour node IDs. Returns an empty set if the node does not exist.
Example:
# Get all nodes within 2 hops of "Paris" (outgoing and incoming)
neighbors = kg.get_neighbors("Paris", max_depth=2, include_incoming=True)
print(neighbors) # {"France", "EiffelTower", "EU", ...}
# Direct outgoing neighbours only
direct = kg.get_neighbors("Paris", max_depth=1, include_incoming=False)find_path
kg.find_path(
start: str,
end: str,
max_length: Optional[int] = None,
) -> Optional[List[str]]Purpose: Finds the shortest path between two nodes using BFS over the directed edge structure. Returns the first (shortest) path found, or None if no path exists within the length limit.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
start |
str |
— | ID of the starting node. |
end |
str |
— | ID of the destination node. |
max_length |
Optional[int] |
None |
Maximum path length (number of nodes). No limit when None. |
Returns: List[str] — ordered list of node IDs from start to end (inclusive); or None if no path exists.
Example:
path = kg.find_path("EiffelTower", "France")
if path:
print(" → ".join(path))
# EiffelTower → Paris → France
else:
print("No path found")find_all_paths
kg.find_all_paths(
start: str,
end: str,
max_length: int = 5,
) -> List[List[str]]Purpose: Finds all acyclic paths between two nodes up to max_length nodes long, using DFS. Use for graph exploration, relationship discovery, or reasoning chain analysis. Note: can be slow on dense graphs — use a reasonable max_length.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
start |
str |
— | ID of the starting node. |
end |
str |
— | ID of the destination node. |
max_length |
int |
5 |
Maximum number of nodes in any returned path. |
Returns: List[List[str]] — list of paths, where each path is an ordered list of node IDs. Returns an empty list if either node does not exist or no paths are found.
Example:
paths = kg.find_all_paths("EiffelTower", "Europe", max_length=4)
for path in paths:
print(" → ".join(path))
# EiffelTower → Paris → France → Europe
# EiffelTower → Paris → EU → EuropeAnalytics & Subgraph
get_subgraph
kg.get_subgraph(node_ids: Set[str]) -> KnowledgeGraphPurpose: Extracts a new KnowledgeGraph containing only the specified nodes and the edges between them. The resulting subgraph inherits the same ConfigGraphRAG as the parent. Useful for isolating a topic cluster, exporting a domain-specific graph, or analysis.
Parameters:
| Parameter | Type | Description |
|---|---|---|
node_ids |
Set[str] |
Set of node IDs to include. Node IDs not present in the graph are silently ignored. |
Returns: KnowledgeGraph — a new independent graph instance containing only the specified nodes and the edges that connect them.
Example:
europe_nodes = {"Paris", "France", "Berlin", "Germany"}
europe_graph = kg.get_subgraph(europe_nodes)
print(europe_graph)
# KnowledgeGraph(nodes=4, edges=3)get_node_degree
kg.get_node_degree(node_id: str) -> Dict[str, int]Purpose: Returns the in-degree, out-degree, and total degree of a node in O(1) time using pre-built counters. Useful for identifying hub nodes (high total degree) or source/sink nodes.
Parameters:
| Parameter | Type | Description |
|---|---|---|
node_id |
str |
ID of the node to analyse. |
Returns: Dict[str, int] with three keys:
| Key | Description |
|---|---|
in_degree |
Number of edges pointing to this node. |
out_degree |
Number of edges originating from this node. |
total_degree |
Sum of in + out degree. |
Returns {"in_degree": 0, "out_degree": 0, "total_degree": 0} if the node does not exist.
Example:
degree = kg.get_node_degree("Paris")
# {"in_degree": 2, "out_degree": 1, "total_degree": 3}
if degree["total_degree"] == 0:
print("Isolated node — no connections.")get_connected_components
kg.get_connected_components() -> List[Set[str]]Purpose: Identifies all connected components of the graph treating it as undirected (both forward and reverse edges considered). Each component is a set of node IDs where every node can reach every other node. Use for detecting isolated clusters, graph fragmentation, or validating graph connectivity.
Parameters: None.
Returns: List[Set[str]] — list of components, where each component is a set of node IDs. A single-node graph returns one component containing that node.
Example:
components = kg.get_connected_components()
print(f"Graph has {len(components)} connected components")
for i, comp in enumerate(components):
print(f" Component {i+1}: {comp}")get_stats
kg.get_stats() -> Dict[str, Any]Purpose: Returns a comprehensive structural statistics summary of the knowledge graph. Useful for monitoring graph growth, detecting anomalies, and dashboards.
Parameters: None.
Returns: Dict[str, Any] with the following keys:
| Key | Type | Description |
|---|---|---|
num_nodes |
int |
Total number of nodes in the graph. |
num_edges |
int |
Total number of directed edges. |
avg_degree |
float |
Average out-degree across all nodes. |
max_degree |
int |
Maximum out-degree of any single node. |
min_degree |
int |
Minimum out-degree of any single node. |
num_components |
int |
Number of connected components (undirected view). |
density |
float |
Edge density: num_edges / (num_nodes × (num_nodes - 1)). 0 for single-node graphs. |
Example:
stats = kg.get_stats()
print(f"Nodes: {stats['num_nodes']} Edges: {stats['num_edges']}")
print(f"Density: {stats['density']:.4f} Components: {stats['num_components']}")validate_integrity
kg.validate_integrity() -> Dict[str, List[str]]Purpose: Scans the graph for structural integrity issues. Detects orphan edges (edges referencing non-existent nodes), duplicate edges, and self-loops. Run after bulk imports or before saving to catch data quality problems.
Parameters: None.
Returns: Dict[str, List[str]] with three keys:
| Key | Description |
|---|---|
orphan_edges |
Descriptions of edges whose source or target node no longer exists in the graph. |
duplicate_edges |
Edge IDs of edges with identical (source, target, relation) triples. |
self_loops |
Edge IDs of edges where source == target. |
All three lists are empty when the graph is clean.
Example:
issues = kg.validate_integrity()
if any(issues.values()):
print("⚠️ Integrity issues found:")
for issue_type, items in issues.items():
if items:
print(f" {issue_type}: {items}")
else:
print("✅ Graph integrity OK")Class: GraphRAG
from fennec_community.rag.types.graph_rag import GraphRAGThe top-level orchestration class that combines a KnowledgeGraph with a VectorDatabase and an optional LLM to deliver graph-augmented retrieval and answer generation.
__init__
GraphRAG(
vector_db: Any,
llm: Optional[Any] = None,
config: Optional[ConfigGraphRAG] = None,
)Purpose: Initialises the GraphRAG system. Creates an empty KnowledgeGraph and sets up all internal mapping dictionaries. No documents are loaded at construction time.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
vector_db |
Any |
— | Required. A VectorDatabase instance. Must expose .search(query, top_k, score_threshold), .add(chunks), .save(path), .load(path, embedder), .get_stats(), .chunks, and .embedder. |
llm |
Optional[Any] |
None |
Any object with a .generate(prompt, **kwargs) -> str method. When None, query() returns the raw graph context string instead of an LLM-generated answer. |
config |
Optional[ConfigGraphRAG] |
None |
Configuration object. Defaults to ConfigGraphRAG() when None. |
Returns: GraphRAG instance.
Example:
from fennec_community.rag.types.graph_rag import GraphRAG, ConfigGraphRAG
config = ConfigGraphRAG(k=10, context_depth=2, embedder_name="all-MiniLM-L6-v2")
rag = GraphRAG(vector_db=my_vdb, llm=my_llm, config=config)
print(rag)
# GraphRAG(nodes=0, edges=0, llm=MyLLM)Document Ingestion
add_document_with_relations
rag.add_document_with_relations(
content: str,
entities: List[Dict[str, Any]],
relations: List[Dict[str, Any]],
doc_id: str,
metadata: Optional[Dict] = None,
) -> Dict[str, Any]Purpose: The primary ingestion method. Registers a document's entities as graph nodes, its relationships as graph edges, and its entity text chunks in the vector database — all in one atomic operation. This is the correct way to populate the system.
Parameters:
| Parameter | Type | Description |
|---|---|---|
content |
str |
The full raw text of the document. Stored internally for LLM context construction. |
entities |
List[Dict[str, Any]] |
List of entity dicts. Each dict must have an "id" key and should have at least one of "text", "name", or "content" for the displayable label. Optional keys: "type" (defaults to "entity"), "metadata". |
relations |
List[Dict[str, Any]] |
List of relation dicts. Each must have "source" and "target" (node IDs). Optional: "type" (defaults to "related"), "weight" (defaults to 1.0), "bidirectional" (defaults to False). |
doc_id |
str |
Unique document identifier. Used to group chunks and full-text lookup for LLM context. |
metadata |
Optional[Dict] |
Extra metadata propagated to all nodes and chunks created from this document. |
Returns: Dict[str, Any] with:
| Key | Type | Description |
|---|---|---|
nodes_added |
int |
Number of graph nodes created. |
edges_added |
int |
Number of graph edges created. |
chunks_created |
int |
Number of vector DB chunks created (one per entity). |
Raises: Re-raises any exception from node/edge creation or vector DB write. Check logs for details.
Entity dict fields:
| Field | Required | Description |
|---|---|---|
id |
✅ | Unique node identifier. |
text / name / content |
Recommended | Display text (tried in that order). Falls back to id. |
type |
➖ | Node type label. Defaults to "entity". |
metadata |
➖ | Additional per-entity metadata. |
Relation dict fields:
| Field | Required | Description |
|---|---|---|
source |
✅ | Source node ID (must exist as an entity id). |
target |
✅ | Target node ID (must exist as an entity id). |
type |
➖ | Relation label. Defaults to "related". |
weight |
➖ | Edge weight. Defaults to 1.0. |
bidirectional |
➖ | If True, edge traversal works in both directions. Defaults to False. |
Example:
result = rag.add_document_with_relations(
content="""
The Eiffel Tower is a wrought-iron lattice tower on the Champ de Mars in Paris, France.
It was named after the engineer Gustave Eiffel.
""",
entities=[
{"id": "EiffelTower", "text": "Eiffel Tower", "type": "landmark"},
{"id": "Paris", "text": "Paris", "type": "city"},
{"id": "France", "text": "France", "type": "country"},
{"id": "GEiffel", "text": "Gustave Eiffel", "type": "person"},
],
relations=[
{"source": "EiffelTower", "target": "Paris", "type": "located_in"},
{"source": "Paris", "target": "France", "type": "capital_of"},
{"source": "EiffelTower", "target": "GEiffel", "type": "named_after"},
],
doc_id="doc_eiffel_001",
metadata={"source": "Wikipedia", "language": "en"},
)
print(result)
# {"nodes_added": 4, "edges_added": 3, "chunks_created": 4}Retrieval
retrieve_with_context
rag.retrieve_with_context(
query: str,
k: int = None,
context_depth: int = None,
min_similarity: float = 0.0,
combine_scores: bool = True,
) -> List[Dict[str, Any]]Purpose: The core graph-augmented retrieval method. Performs vector similarity search to find seed nodes, then expands context through the knowledge graph using BFS. Each neighbour node receives a decayed score (score × 0.5^depth). Results are ranked by final score and truncated to k.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
query |
str |
— | The natural-language query to search for. |
k |
int |
config.k |
Maximum number of results to return. |
context_depth |
int |
config.context_depth |
BFS expansion depth. 0 = vector search only, no graph expansion. |
min_similarity |
float |
0.0 |
Minimum cosine similarity score to accept from vector search. Filter out low-quality seed nodes. |
combine_scores |
bool |
True |
When True, a node already in the expanded set has its score updated to the maximum of its current and new decayed score. When False, the first-seen score is kept. |
Returns: List[Dict[str, Any]] — list of result dicts sorted by score descending, truncated to k. See Return Value Reference for the result dict schema. Returns an empty list on error or no results.
Example:
results = rag.retrieve_with_context(
query="Where is the Eiffel Tower?",
k=5,
context_depth=2,
min_similarity=0.3,
)
for r in results:
print(f"[{r['type']}] {r['id']} — score: {r['score']:.3f}")
print(f" Content: {r['content']}")
print(f" Neighbors: {[n['id'] for n in r['neighbors']]}")semantic_search
rag.semantic_search(
query: str,
top_k: int = 5,
include_graph_info: bool = True,
) -> List[Dict[str, Any]]Purpose: Pure vector similarity search without graph expansion. Returns chunk-level results directly from the vector database. Optionally enriches each result with its linked graph node's type and neighbour count. Use this when you want raw retrieval without the BFS overhead, or to compare graph-augmented vs. flat retrieval.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
query |
str |
— | The natural-language query. |
top_k |
int |
5 |
Number of results to return. |
include_graph_info |
bool |
True |
When True, enriches each result with the linked graph node's id, type, and neighbors_count. |
Returns: List[Dict[str, Any]] — each entry contains:
| Key | Type | Description |
|---|---|---|
text |
str |
The chunk's text content. |
score |
float |
Cosine similarity score from the vector DB. |
metadata |
Dict |
Chunk metadata (e.g., doc_id, node_id). |
graph_node |
Dict |
(only if include_graph_info=True and a node is linked) Contains id, type, neighbors_count. |
Example:
results = rag.semantic_search("Eiffel Tower engineer", top_k=3)
for r in results:
print(f"Score {r['score']:.3f}: {r['text']}")
if "graph_node" in r:
print(f" → Linked node: {r['graph_node']['id']} ({r['graph_node']['type']})")
print(f" → Neighbours: {r['graph_node']['neighbors_count']}")get_node_context
rag.get_node_context(
node_id: str,
max_depth: int = 2,
include_chunks: bool = True,
) -> Dict[str, Any]Purpose: Returns comprehensive contextual information about a specific graph node — its own data, its neighbours grouped by traversal depth, and optionally all associated vector DB chunks. Use for graph exploration, entity inspection, or debugging the graph structure.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
node_id |
str |
— | ID of the node to inspect. |
max_depth |
int |
2 |
How many hops of neighbours to include. Depth 1 = direct neighbours; depth 2 = neighbours of neighbours, etc. |
include_chunks |
bool |
True |
When True, all vector DB chunks linked to this node are included in the output. |
Returns: Dict[str, Any] with the following structure (returns {} if the node does not exist):
{
"node": {
"id": str,
"content": str,
"type": str,
"metadata": dict,
},
"neighbors_by_depth": {
1: [{"id": str, "content": str, "type": str}, ...],
2: [{"id": str, "content": str, "type": str}, ...],
# ... up to max_depth
},
"related_chunks": [ # only when include_chunks=True
{"text": str, "metadata": dict},
...
],
}Example:
ctx = rag.get_node_context("Paris", max_depth=2, include_chunks=True)
print("Node:", ctx["node"])
print("Direct neighbours:", ctx["neighbors_by_depth"][1])
print("2-hop neighbours:", ctx["neighbors_by_depth"][2])
print("Associated chunks:", len(ctx["related_chunks"]))Query & Generation
query
rag.query(
query: str,
k: int = None,
language: str = "ar",
include_sources: bool = False,
**llm_kwargs,
) -> strPurpose: The main end-to-end query interface. Orchestrates the full pipeline: retrieve_with_context → _build_graph_context → _build_prompt → llm.generate. When no LLM is configured, returns the raw graph context string directly.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
query |
str |
— | The natural-language question to answer. Returns a warning string if empty or whitespace. |
k |
int |
config.k |
Number of nodes to retrieve and include in context. |
language |
str |
"ar" |
Language for the LLM prompt template. Supported values: "ar" (Arabic) and "en" (English). Other values result in None from the prompt builder. |
include_sources |
bool |
False |
When True, appends a formatted sources list to the answer, showing each result's type, ID, and score. |
**llm_kwargs |
Any |
— | Additional keyword arguments forwarded directly to llm.generate(prompt, **llm_kwargs). Useful for temperature, max_tokens, etc. |
Returns: str — the LLM-generated answer (or raw context if no LLM); an error string starting with "❌" on failure; or "⚠️ Enter a correct query" for empty input.
Example:
# Arabic (default)
answer = rag.query("أين يقع برج إيفل؟", language="ar")
print(answer)
# English with sources
answer = rag.query(
"Where is the Eiffel Tower?",
k=5,
language="en",
include_sources=True,
temperature=0.1, # forwarded to llm.generate
)
print(answer)
# "The Eiffel Tower is located in Paris, France..."
# 📚 Source :
# • [landmark] EiffelTower (score: 0.92)
# • [city] Paris (score: 0.87)generate
rag.generate(query: str, k: int = None, **kwargs) -> strPurpose: Alias for query(). Provided for compatibility with other RAG implementations that expect a .generate(query) interface (including the FederatedRAG federation layer).
Parameters: Identical to query except language and include_sources must be passed via **kwargs.
Returns: Same as query().
Example:
# Used by FederatedRAG when this GraphRAG is registered as a source:
answer = rag.generate("What is the Eiffel Tower?")
# Equivalent to:
answer = rag.query("What is the Eiffel Tower?")Observability
get_statistics
rag.get_statistics() -> Dict[str, Any]Purpose: Returns a unified statistics snapshot covering both the knowledge graph and the vector database, along with internal mapping sizes. Use for monitoring dashboards, debugging, and capacity planning.
Parameters: None.
Returns: Dict[str, Any] with three top-level keys:
| Key | Type | Description |
|---|---|---|
graph |
Dict |
Output of KnowledgeGraph.get_stats() — node/edge counts, density, components, degree stats. |
vector_db |
Dict |
Output of VectorDatabase.get_stats() — index size, embedding dimension, etc. |
mappings |
Dict |
Internal mapping table sizes: chunks_to_nodes (total chunk↔node links) and nodes_with_chunks (nodes that have at least one chunk). |
Example:
stats = rag.get_statistics()
print("Graph:", stats["graph"])
print("Vector DB:", stats["vector_db"])
print("Mappings:", stats["mappings"])
# {"graph": {"num_nodes": 150, "num_edges": 320, "density": 0.014, ...},
# "vector_db": {"num_vectors": 150, "dim": 384, ...},
# "mappings": {"chunks_to_nodes": 150, "nodes_with_chunks": 150}}Persistence
save
rag.save(path: str) -> NonePurpose: Persists the entire GraphRAG system state to disk. Creates the target directory if it does not exist. Saves three artefacts: the vector database, the graph JSON, and the chunk-node mapping JSON.
Parameters:
| Parameter | Type | Description |
|---|---|---|
path |
str |
Directory path where the system state will be saved. Created automatically if it does not exist. |
Returns: None
Saved files layout:
<path>/
├── vector_db/ ← VectorDatabase.save() output
├── graph.json ← nodes[] and edges[] arrays
└── mappings.json ← chunk_to_node, node_to_chunks, doc_contentsExample:
rag.save("./saved_graphrag/v1")load
rag.load(path: str) -> NonePurpose: Restores the complete GraphRAG system state from a directory previously written by save(). After loading, the GraphRAG instance is fully operational with all nodes, edges, chunks, and mappings restored.
⚠️ Note:
loadrestores the vector database and mappings but does not reconstruct theKnowledgeGraphfromgraph.jsonautomatically in the current implementation. The graph must be rebuilt by callingadd_document_with_relationsagain, or by manually loading nodes and edges from the saved JSON.
Parameters:
| Parameter | Type | Description |
|---|---|---|
path |
str |
Directory path previously written by save(). |
Returns: None
Example:
rag2 = GraphRAG(vector_db=new_vdb, llm=my_llm)
rag2.load("./saved_graphrag/v1")
answer = rag2.query("Where is the Eiffel Tower?", language="en")Async API
All async methods use asyncio.to_thread to run the synchronous implementations in a thread pool, making them safe to call from async frameworks like FastAPI, aiohttp, or when used inside FederatedRAG's async query engine.
aquery
async def aquery(query: str, k: int = None, **kwargs) -> strPurpose: Async version of query(). Runs query() in a thread pool executor so it does not block the event loop. All parameters and return values are identical to query().
Example:
import asyncio
async def main():
answer = await rag.aquery(
"Where is the Eiffel Tower?",
k=5,
language="en",
include_sources=True,
)
print(answer)
asyncio.run(main())agenerate
async def agenerate(query: str, k: int = None, **kwargs) -> strPurpose: Async alias for aquery(). Provided for compatibility with async-aware federation systems (e.g., FederatedRAG.query_async). All parameters and return values are identical to aquery().
Example:
# Called automatically by FederatedRAG when GraphRAG is a registered source
answer = await rag.agenerate("What is the Eiffel Tower?")aretrieve
async def aretrieve(
query: str,
top_k: int = None,
k: int = None,
**kwargs,
) -> List[Dict[str, Any]]Purpose: Async version of retrieve_with_context(). Accepts both k and top_k as aliases for the result count (both are equivalent; k takes precedence when both are provided). Runs retrieval in a thread pool. Used by FederatedRAG's _call_generate when this GraphRAG is registered as a federated source.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
query |
str |
— | The natural-language query to retrieve for. |
top_k |
int |
None |
Alias for k. Used when the caller follows the top_k convention. |
k |
int |
None |
Number of results. Takes precedence over top_k. Falls back to config.k. |
**kwargs |
Forwarded to retrieve_with_context. |
Returns: List[Dict[str, Any]] — same as retrieve_with_context().
Example:
results = await rag.aretrieve("Paris landmarks", top_k=5)
for r in results:
print(r["id"], r["score"])Return Value Reference
Result dict schema (from retrieve_with_context and _build_results)
Each element in the returned list has the following structure:
| Key | Type | Description |
|---|---|---|
id |
str |
Node ID. |
content |
str |
Node's text content. |
type |
str |
Node type label (e.g., "city", "person", "landmark"). |
score |
float |
Combined relevance score (vector similarity × decay factor for expanded nodes). |
metadata |
Dict[str, Any] |
Node metadata dict (includes doc_id and any custom fields). |
related_chunks |
List[Dict] |
Vector DB chunks linked to this node. Each chunk has text and metadata. |
neighbors |
List[Dict] |
Up to 5 direct graph neighbours. Each neighbour has id, content, and type. |
Graph Context Expansion — Score Decay Model
When context_depth > 0, the BFS expander assigns decayed scores to neighbour nodes:
neighbor_score = parent_score × 0.5^(current_depth + 1)| Depth | Decay factor | Effect |
|---|---|---|
| Seed (depth 0) | 1.0 (no decay) |
Direct vector match — full score |
| Depth 1 neighbours | 0.5^1 = 0.50 |
Direct neighbours get half the seed score |
| Depth 2 neighbours | 0.5^2 = 0.25 |
2-hop neighbours get a quarter |
| Depth 3 neighbours | 0.5^3 = 0.125 |
3-hop neighbours get an eighth |
When combine_scores=True, a node that is reachable via multiple paths receives the maximum score from all paths (not the sum), preventing score inflation for hub nodes.
Environment Variables Reference
| Variable | Type | Default | Description |
|---|---|---|---|
GRAPHRAG_K |
int |
5 |
Default number of retrieval results. |
GRAPHRAG_CONTEXT_DEPTH |
int |
2 |
BFS expansion depth. |
GRAPHRAG_MAX_DEPTH |
int |
2 |
Maximum graph traversal depth for neighbour queries. |
GRAPHRAG_EMBEDDER |
str |
"all-MiniLM-L6-v2" |
Sentence-transformer model name. |
GRAPHRAG_BATCH_SIZE |
int |
32 |
Embedding batch size. |
GRAPHRAG_USE_GPU |
"true"/"false" |
"false" |
Enable GPU acceleration for FAISS. |
GRAPHRAG_LOG_LEVEL |
str |
"INFO" |
Python logging level. |
GRAPHRAG_HYBRID_SEARCH |
"true"/"false" |
"false" |
Enable hybrid semantic + keyword search. |
Saved Files Layout
<save_path>/
├── vector_db/
│ └── (VectorDatabase internal files — FAISS index, chunk list, etc.)
├── graph.json
│ ├── nodes: [{"id": ..., "content": ..., "node_type": ..., "metadata": ...}, ...]
│ └── edges: [{"source": ..., "target": ..., "relation": ..., "weight": ..., "bidirectional": ...}, ...]
└── mappings.json
├── chunk_to_node: {"entity_Paris": "Paris", ...}
├── node_to_chunks: {"Paris": ["entity_Paris"], ...}
├── node_to_chunks_set: {"Paris": ["entity_Paris"], ...}
└── doc_contents: {"doc_001": "Full text of the document...", ...}Complete Examples
Example 1 — Build and query a geography knowledge graph
from fennec_community.rag.types.graph_rag import GraphRAG, ConfigGraphRAG
config = ConfigGraphRAG(k=5, context_depth=2, log_level="INFO")
rag = GraphRAG(vector_db=my_vdb, llm=my_llm, config=config)
# Ingest
rag.add_document_with_relations(
content="Paris is the capital of France. The Eiffel Tower is a famous landmark in Paris.",
entities=[
{"id": "Paris", "text": "Paris", "type": "city"},
{"id": "France", "text": "France", "type": "country"},
{"id": "EiffelTower", "text": "Eiffel Tower", "type": "landmark"},
],
relations=[
{"source": "Paris", "target": "France", "type": "capital_of"},
{"source": "EiffelTower", "target": "Paris", "type": "located_in"},
],
doc_id="geo_001",
)
# Query in English
answer = rag.query("What landmarks are in Paris?", language="en", include_sources=True)
print(answer)Example 2 — Inspect graph structure
from fennec_community.rag.types.graph_rag import KnowledgeGraph, GraphNode, GraphEdge
kg = KnowledgeGraph()
# Add nodes
kg.add_node(GraphNode("A", "Node A content", "concept"))
kg.add_node(GraphNode("B", "Node B content", "concept"))
kg.add_node(GraphNode("C", "Node C content", "concept"))
# Add edges
kg.add_edge(GraphEdge("A", "B", "relates_to"))
kg.add_edge(GraphEdge("B", "C", "leads_to"))
# Traverse
print(kg.get_neighbors("A", max_depth=2)) # {"B", "C"}
print(kg.find_path("A", "C")) # ["A", "B", "C"]
print(kg.find_all_paths("A", "C", max_length=5))
# Analytics
print(kg.get_stats())
print(kg.get_node_degree("B")) # {"in_degree": 1, "out_degree": 1, "total_degree": 2}
print(kg.validate_integrity()) # {"orphan_edges": [], "duplicate_edges": [], "self_loops": []}Example 3 — Async usage in FastAPI
from fastapi import FastAPI
from fennec_community.rag.types.graph_rag import GraphRAG, ConfigGraphRAG
app = FastAPI()
config = ConfigGraphRAG.from_env()
rag = GraphRAG(vector_db=my_vdb, llm=my_llm, config=config)
@app.get("/query")
async def answer_query(q: str, lang: str = "en"):
answer = await rag.aquery(q, language=lang, include_sources=True)
return {"answer": answer}
@app.get("/retrieve")
async def retrieve(q: str, top_k: int = 5):
results = await rag.aretrieve(q, top_k=top_k)
return {"results": results}
@app.get("/node/{node_id}")
async def node_context(node_id: str, depth: int = 2):
ctx = rag.get_node_context(node_id, max_depth=depth)
return ctxExample 4 — Save and load state
from fennec_community.rag.types.graph_rag import GraphRAG, ConfigGraphRAG
# Build and populate
rag = GraphRAG(vector_db=my_vdb, llm=my_llm)
rag.add_document_with_relations(...)
# Save
rag.save("./my_graphrag_state")
# Restore in a new process
rag2 = GraphRAG(vector_db=fresh_vdb, llm=my_llm)
rag2.load("./my_graphrag_state")
answer = rag2.query("Where is the Eiffel Tower?", language="en")
print(answer)Example 5 — Use with FederatedRAG
from fennec_community.rag.types.graph_rag import GraphRAG
from fennec_community.rag.types.federated_rag import FederatedRAG, AggregationMethod
graph_rag_1 = GraphRAG(vector_db=vdb1, llm=my_llm)
graph_rag_2 = GraphRAG(vector_db=vdb2, llm=my_llm)
# Register both GraphRAG instances as federated sources
fed = FederatedRAG(aggregation_method=AggregationMethod.RANKING)
fed.add_source("knowledge_base_1", graph_rag_1, weight=2.0, timeout=10.0)
fed.add_source("knowledge_base_2", graph_rag_2, weight=1.0, timeout=8.0)
# FederatedRAG calls graph_rag.aretrieve() + graph_rag.agenerate() automatically
result = await fed.query_async("What is the Eiffel Tower?")
print(result["answer"])Example 6 — Node-level operations
from fennec_community.rag.types.graph_rag import GraphNode
# Create
node = GraphNode(
id="GEiffel",
content="Gustave Eiffel was a French civil engineer.",
node_type="person",
metadata={"nationality": "French", "born": 1832},
)
# Inspect
print(node.has_embedding) # False
print(node.get_content_hash()) # MD5 hash string
# Serialise
data = node.to_dict(include_embedding=False)
# Deserialise
node2 = GraphNode.from_dict(data)
assert node == node2 # True (equality based on ID)
# Update
node.update_metadata("died", 1923)
node.merge_metadata({"awards": ["Legion of Honour"], "profession": "engineer"})community/rag/graph_rag.md