Fennec Persistence Module
Enterprise-Grade Multi-Backend Persistence Layer for RAG Applications
Table of Contents
- Overview
- System Architecture
- Core Concepts
- Quick Start Guide
- Public API Reference
- Configuration System
- Security Model
- Storage Backends — Deep Dive
- Observability & Metrics
- Edge Cases & Failure Handling
- Advanced Usage
1. Overview
The Fennec Persistence Module is a production-grade, multi-tenant persistence layer designed for RAG (Retrieval-Augmented Generation) applications and LLM pipelines. It unifies four fundamentally different storage backends — key-value, vector, relational, and object storage — behind a single coherent API, with automatic routing, encryption, versioning, access control, and lifecycle management built in.
Why It Exists
RAG systems require at least three distinct storage paradigms simultaneously: a fast key-value store for caching, a vector index for semantic search, and a relational database for structured metadata. Managing each backend independently creates duplication, inconsistent security boundaries, and fragile operational procedures. The Fennec Persistence Module solves this by providing a single entry point — PersistenceManager — that makes the right storage decision automatically based on data type and size, while enforcing uniform encryption, RBAC, and audit logging across all backends.
Real-World Use Cases
| Use Case | How the Module Helps |
|---|---|
| LLM response caching | Route DataCategory.CACHE entries to KeyValueStorage with automatic TTL enforcement |
| Embedding storage & retrieval | Store and search DataCategory.EMBEDDING vectors with per-tenant isolation |
| RAG document ingestion | Persist large document chunks to ObjectStorage; metadata to DatabaseStorage |
| Multi-tenant SaaS | Complete namespace isolation per tenant; per-tenant RBAC, rate limiting, and encryption keys |
| Audit & compliance | Append-only audit log for every READ, WRITE, DELETE, BACKUP operation |
| Disaster recovery | Point-in-time snapshots with one-command restore |
| Data versioning | Full version history with rollback for any stored key |
2. System Architecture
Pipeline Overview
┌─────────────────────────────────────────────────────────────────────┐
│ WRITE PATH │
│ │
│ pm.store(key, value, tenant, category) │
│ │ │
│ ▼ │
│ [DataSanitizer] ← validate key, tenant, value size │
│ │ │
│ ▼ │
│ [AccessControlManager] ← RBAC + policy check + rate limit │
│ │ │
│ ▼ │
│ [StorageRouter] ← route by DataCategory + size → StorageType │
│ │ │
│ ├── EMBEDDING → VectorStorage (FAISS / pgvector) │
│ ├── DOCUMENT → ObjectStorage (local / S3) │
│ ├── CACHE → KeyValueStorage (in-memory / Redis) │
│ └── METADATA → DatabaseStorage (SQLite / PostgreSQL) │
│ │
│ [EncryptionEngine] ← AES-256-GCM, per-tenant derived key │
│ │ │
│ ▼ │
│ [VersionManager] ← SHA-256 checksum + version chain │
│ │ │
│ ▼ │
│ [Backend.set()] ← atomic write to chosen backend │
│ │ │
│ ▼ │
│ [DatabaseStorage] ← metadata mirrored to relational DB │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ READ PATH │
│ │
│ pm.retrieve(key, tenant, category) │
│ │ │
│ ▼ │
│ [DataSanitizer + AccessControlManager] │
│ │ │
│ ▼ │
│ [StorageRouter] ← same routing logic as write path │
│ │ │
│ ▼ │
│ [Backend.get()] ← TTL checked, access_count incremented │
│ │ │
│ ▼ │
│ [EncryptionEngine.decrypt()] ← only if blob is encrypted │
│ │ │
│ ▼ │
│ PersistenceResult → returned to caller │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ BACKGROUND SERVICES │
│ │
│ LifecycleManager │
│ ├── TTL reaper (KeyValueStorage: 1-second sweep) │
│ └── Lifecycle sweep (configurable interval, default 60s) │
│ │
│ TransactionManager │
│ └── 2PC-lite: stage → execute → compensate on failure │
└─────────────────────────────────────────────────────────────────────┘Component Responsibilities
| Component | Responsibility |
|---|---|
PersistenceManager |
Single entry point. Wires all subsystems. Exposes store, retrieve, delete, vector_search, batch_*, transaction, backup, and versioning APIs. |
StorageRouter |
Evaluates data category and payload size against a prioritised rule set to select the correct StorageType and DataTier. |
VersionManager |
Maintains an in-memory SHA-256 version chain per scoped key. Supports rollback to any historical version within max_versions. |
BackupManager |
Creates and restores pickle-serialised point-in-time snapshots. Writes a manifest.json alongside each snapshot. |
TransactionManager |
2-phase-commit-lite across multiple backends. Stages operations, executes them in order, and executes compensating deletes on any failure. |
LifecycleManager |
Periodic background sweep enforcing TTL expiry and data tier recommendations. Exposes storage cost estimates. |
EncryptionEngine |
AES-256-GCM symmetric encryption. Derives a unique key per tenant from the master key using PBKDF2-SHA256. Produces self-describing blobs (version byte + nonce + ciphertext + tag). |
AccessControlManager |
RBAC with per-tenant policies, operation-level deny lists, and token-bucket rate limiting. Records every decision to an in-process audit log. |
DataSanitizer |
Input validation: key format, key length (≤512 chars), value size (≤100 MB), path traversal prevention on both keys and tenant IDs. |
Design Philosophy
- One API, four backends. All backends share the same
BasePersistencecontract, making them interchangeable without caller changes. - Automatic routing. Callers declare what they are storing (
DataCategory), not where. The router decides. - Tenant-first isolation. Every key is scoped as
{tenant_id}::{namespace}::{key}. Cross-tenant reads are architecturally impossible through the public API. - Defense in depth. Validation → authorisation → encryption → versioning — in that order, always.
- Production swap paths. Every backend has a documented one-line production replacement: in-memory dict → Redis, local filesystem → S3, SQLite → PostgreSQL, NumPy index → FAISS/pgvector.
3. Core Concepts
3.1 Data Categories and Routing
DataCategory is the primary routing signal. Each value maps to a default backend:
DataCategory |
Default Backend | Default Tier |
|---|---|---|
EMBEDDING |
VectorStorage |
HOT |
DOCUMENT |
ObjectStorage |
WARM |
CACHE |
KeyValueStorage |
HOT |
METADATA |
DatabaseStorage |
HOT |
CHECKPOINT |
DatabaseStorage |
HOT |
AUDIT |
DatabaseStorage |
HOT |
Large payloads (>1 MB) are routed to ObjectStorage regardless of category.
3.2 Data Tiers
DataTier describes the cost/performance trade-off:
| Tier | Relative Cost/GB/Month | Use Case |
|---|---|---|
HOT |
1.0× | Frequently accessed data — embeddings, caches |
WARM |
0.3× | Documents accessed a few times per day |
COLD |
0.05× | Rarely accessed archives |
FROZEN |
0.005× | Long-term audit retention |
3.3 TenantContext
Every operation requires a TenantContext. It carries the isolation namespace and RBAC roles:
from fennec_memory.persistence import TenantContext
tenant = TenantContext(
tenant_id="acme", # required: org/account identifier
namespace="prod", # required: environment/project qualifier
user_id="alice", # optional: for audit trails and version authorship
session_id="sess_001", # optional: for session-scoped audit
roles=["writer"], # RBAC roles applied to this operation
)
# Derived helpers
tenant.storage_prefix() # → "acme::prod"
tenant.scoped_key("doc:001") # → "acme::prod::doc:001"3.4 PersistenceResult
All single-item operations return a PersistenceResult:
@dataclass
class PersistenceResult:
success: bool
key: str
operation: OperationType # READ | WRITE | DELETE | UPDATE | LIST
storage_type: Optional[StorageType]
data: Optional[Any] # The retrieved value (on READ)
metadata: Optional[StorageMetadata]
error: Optional[str] # Error message if success=False
latency_ms: float
version: Optional[DataVersion]
from_cache: boolFactory methods:
PersistenceResult.ok(key, operation, data=..., latency_ms=..., ...)
PersistenceResult.fail(key, operation, error="reason")3.5 BatchResult
All batch operations return a BatchResult:
@dataclass
class BatchResult:
total: int
succeeded: int
failed: int
results: List[PersistenceResult]
errors: Dict[str, str] # key → error message for failed items
latency_ms: float
# Computed properties
batch.success_rate # float: succeeded / total
batch.all_succeeded # bool3.6 Version Chain
Every store() call records a DataVersion linked to its predecessor:
@dataclass
class DataVersion:
version_id: str # UUID
version_num: int # monotonically increasing per key
created_at: datetime
created_by: Optional[str] # tenant.user_id
checksum: Optional[str] # SHA-256 of raw bytes
parent_version: Optional[str] # previous version_id
tags: Dict[str, str]3.7 Multi-Tenancy Isolation
Isolation is enforced at three levels:
- Key scoping: All storage writes and reads use
tenant.scoped_key(key)as the actual storage key. A caller cannot read another tenant's data by guessing keys. - Vector index isolation:
VectorStoragemaintains a separateVectorIndexinstance pertenant.storage_prefix(). Semantic searches are inherently scoped. - Encryption key isolation:
EncryptionEnginederives a unique AES-256 key pertenant_idusing PBKDF2-SHA256. Decrypting another tenant's ciphertext with the wrong derived key fails authentication (GCM tag verification).
4. Quick Start Guide
Installation
pip install fennec-memory
pip install aiosqlite cryptography numpy
# Production extras:
pip install aioredis faiss-cpu aioboto3 asyncpgMinimal Working Example
import asyncio
from fennec_memory.persistence import PersistenceManager, TenantContext, DataCategory
async def main():
# 1. Create and initialize
pm = await PersistenceManager.create()
# 2. Define a tenant context
tenant = TenantContext(
tenant_id="acme",
namespace="prod",
user_id="alice",
roles=["writer"],
)
# 3. Store a document
result = await pm.store(
key="doc:001",
value=b"My RAG document content...",
tenant=tenant,
category=DataCategory.DOCUMENT,
ttl=3600,
)
print(result.success, result.version.version_num)
# 4. Retrieve it
result = await pm.retrieve("doc:001", tenant, category=DataCategory.DOCUMENT)
print(result.data[:40])
# 5. Shut down
await pm.shutdown()
asyncio.run(main())Context Manager Pattern (Recommended)
pmm=await PersistenceManager.create()
async with pmm as pm:
tenant = TenantContext(tenant_id="acme", namespace="prod", roles=["writer"])
await pm.store("cache:response:123", b"cached answer", tenant,
category=DataCategory.CACHE, ttl=300)Store → Retrieve → Delete Pattern
async with PersistenceManager.create() as pm:
# Store
await pm.store("key", value, tenant, category=DataCategory.METADATA)
# Retrieve
result = await pm.retrieve("key", tenant, category=DataCategory.METADATA)
if result.success:
data = result.data
else:
print(f"Miss: {result.error}")
# Delete
await pm.delete("key", tenant, category=DataCategory.METADATA)5. Public API Reference
5.1 PersistenceManager
The only class you need to import for normal usage.
from fennec_memory.persistence import PersistenceManager, PersistenceManagerConfig
# Full import paths (for direct subsystem access):
from fennec_memory.persistence import PersistenceManager, PersistenceManagerConfig
from fennec_memory.persistence import TenantContext, DataCategory, DataTier, StorageConfig, StorageType
from fennec_memory.persistence import AccessPolicy, Permission, EncryptionEngine
from fennec_memory.persistence import RoutingRuleFactory / Lifecycle
PersistenceManager.create
@classmethod
async def create(cls, config: Optional[PersistenceManagerConfig] = None) -> PersistenceManagerCreates, configures, and fully initialises a PersistenceManager. Calls initialize() internally. Returns a ready-to-use instance.
| Parameter | Type | Required | Description |
|---|---|---|---|
config |
PersistenceManagerConfig |
No | Full configuration object. If None, all defaults are used (in-memory backends, dev encryption key). |
# With defaults (dev/test)
pm = await PersistenceManager.create()
# With custom config
from fennec_memory.persistence import PersistenceManagerConfig, StorageConfig, StorageType
config = PersistenceManagerConfig(
db_config=StorageConfig(
storage_type=StorageType.RELATIONAL,
extra={"db_path": "/var/data/rag.db"},
),
master_key=os.environ["MASTER_KEY"].encode(),
enable_encryption=True,
)
pm = await PersistenceManager.create(config)initialize
async def initialize(self) -> NoneConnects all four backends concurrently and starts the LifecycleManager background sweep. Idempotent — safe to call multiple times.
shutdown
async def shutdown(self) -> NoneStops the lifecycle background task and disconnects all backends gracefully. Always call this (or use the async context manager) to avoid resource leaks.
health_check
async def health_check(self) -> Dict[str, bool]Returns per-backend reachability status.
health = await pm.health_check()
# {"key_value": True, "vector": True, "relational": True, "object": True}Core Store / Retrieve Operations
store
Universal write operation. Automatically routes to the correct backend, encrypts, versions, and enforces access control.
async def store(
self,
key: str,
value: Any,
tenant: TenantContext,
category: DataCategory = DataCategory.DOCUMENT,
ttl: Optional[int] = None,
tags: Optional[Dict[str, str]] = None,
tier: Optional[DataTier] = None,
encrypt: bool = True,
size_hint: int = 0,
) -> PersistenceResult| Parameter | Type | Required | Description |
|---|---|---|---|
key |
str |
Yes | Storage key. Max 512 chars. No null bytes, \r, \n, ../, or ..\\. |
value |
Any |
Yes | Data to store. Accepts bytes, str, np.ndarray, or any JSON-serialisable object. |
tenant |
TenantContext |
Yes | Tenant isolation context with RBAC roles. |
category |
DataCategory |
No | Determines which backend is used (default: DOCUMENT). |
ttl |
int |
No | Time-to-live in seconds. None = no expiry. Cache entries default to config.default_cache_ttl_secs. |
tags |
Dict[str, str] |
No | Arbitrary string key-value tags attached to the version record. |
tier |
DataTier |
No | Override the router's automatic tier selection. |
encrypt |
bool |
No | Encrypt the value before writing (default: True). np.ndarray values are never encrypted. |
size_hint |
int |
No | Pre-computed byte size; used for routing size decisions when value is already serialised. |
Returns: PersistenceResult with success=True, the version record, and measured latency_ms.
Internal steps:
- Key and tenant validated by
DataSanitizer. AccessControlManager.require_permission(WRITE)— raisesPermissionErroron denial.- Value serialised to
bytes(JSON for objects,tobytes()for arrays). DataSanitizer.validate_value_size()— rejects values >100 MB.StorageRouter.route(category, size_bytes)selectsStorageTypeandDataTier.- AES-256-GCM encryption applied (unless
encrypt=Falseor value isnp.ndarray). VersionManager.record()creates a SHA-256-checksummed version entry.StorageMetadataassembled with all routing, TTL, tier, and version information.- Chosen backend's
set()called. - Metadata mirrored to
DatabaseStorage(unless the primary backend already isDatabaseStorage).
# Store a document
result = await pm.store(
key="doc:annual_report_2024",
value=b"<report content>",
tenant=tenant,
category=DataCategory.DOCUMENT,
ttl=86400,
tags={"source": "upload", "year": "2024"},
)
print(f"Stored version {result.version.version_num}, "
f"checksum {result.version.checksum[:16]}…")
# Store JSON metadata
await pm.store(
key="meta:session:001",
value={"query_count": 42, "last_query": "explain decorators"},
tenant=tenant,
category=DataCategory.METADATA,
)retrieve
Universal read operation. Determines the correct backend, fetches, and decrypts the value.
async def retrieve(
self,
key: str,
tenant: TenantContext,
category: Optional[DataCategory] = None,
decrypt: bool = True,
) -> PersistenceResult| Parameter | Type | Required | Description |
|---|---|---|---|
key |
str |
Yes | The key to retrieve. |
tenant |
TenantContext |
Yes | Must have READ permission. |
category |
DataCategory |
No | Overrides routing when specified. Defaults to DOCUMENT. |
decrypt |
bool |
No | Attempt AES-256-GCM decryption of the retrieved bytes (default: True). |
Returns: PersistenceResult. On miss: success=False, error="Key not found". On TTL expiry: error="Key expired".
result = await pm.retrieve("doc:annual_report_2024", tenant,
category=DataCategory.DOCUMENT)
if result.success:
content = result.data # bytes
print(f"From cache: {result.from_cache}")
else:
print(f"Not found: {result.error}")delete
Remove a key from its backend.
async def delete(
self,
key: str,
tenant: TenantContext,
category: DataCategory = DataCategory.DOCUMENT,
) -> PersistenceResultRequires DELETE permission. Routes to the same backend as store and retrieve.
Vector Operations
store_embedding
Store a single embedding vector with optional metadata.
async def store_embedding(
self,
key: str,
vector: np.ndarray,
tenant: TenantContext,
meta: Optional[Dict[str, Any]] = None,
ttl: Optional[int] = None,
) -> PersistenceResult| Parameter | Type | Required | Description |
|---|---|---|---|
key |
str |
Yes | Unique identifier for this embedding (e.g., "chunk:doc001:para3"). |
vector |
np.ndarray |
Yes | 1-D float32 array. Length must match config.vector_config.extra["embedding_dim"] (default: 1536). |
tenant |
TenantContext |
Yes | Requires WRITE on DataCategory.EMBEDDING. |
meta |
Dict[str, Any] |
No | Arbitrary metadata stored alongside the vector (e.g., source document ID, chunk index). |
ttl |
int |
No | TTL in seconds. |
Returns: PersistenceResult with storage_type=StorageType.VECTOR.
Dimension mismatch: If
vector.shape[0]does not match the configured embedding dimension,store_embeddingreturnsPersistenceResult.fail(...)with a descriptive error. No exception is raised.
import numpy as np
embedding = np.random.rand(1536).astype(np.float32)
result = await pm.store_embedding(
key="chunk:doc001:para3",
vector=embedding,
tenant=tenant,
meta={"doc_id": "doc:001", "chunk_index": 3, "text": "first 100 chars..."},
)vector_search
Semantic similarity search over stored embeddings for a specific tenant.
async def vector_search(
self,
query_vector: np.ndarray,
tenant: TenantContext,
k: int = 10,
filter_meta: Optional[Dict[str, Any]] = None,
) -> List[Tuple[str, float, Dict[str, Any]]]| Parameter | Type | Required | Description |
|---|---|---|---|
query_vector |
np.ndarray |
Yes | Query embedding. Must match the configured dimension. |
tenant |
TenantContext |
Yes | Requires READ on DataCategory.EMBEDDING. |
k |
int |
No | Number of results to return (default: 10). |
filter_meta |
Dict[str, Any] |
No | Post-filter results by exact metadata key-value matches (AND semantics — all fields must match). |
Returns: List[Tuple[key, cosine_score, metadata]], sorted by descending similarity.
query_vec = embed("What are Python decorators?") # your embed function
results = await pm.vector_search(
query_vector=query_vec,
tenant=tenant,
k=5,
filter_meta={"doc_id": "doc:001"},
)
for key, score, meta in results:
print(f"{score:.4f} {key} chunk={meta['chunk_index']}")Batch Operations
batch_store
Store multiple items, chunked to avoid backend overload.
async def batch_store(
self,
items: Dict[str, Any],
tenant: TenantContext,
category: DataCategory = DataCategory.DOCUMENT,
ttl: Optional[int] = None,
chunk_size: Optional[int] = None,
) -> BatchResult| Parameter | Type | Required | Description |
|---|---|---|---|
items |
Dict[str, Any] |
Yes | Mapping of {key: value} pairs to store. |
tenant |
TenantContext |
Yes | Requires WRITE permission. |
category |
DataCategory |
No | Applied uniformly to all items. |
ttl |
int |
No | Applied uniformly to all items. |
chunk_size |
int |
No | Items per concurrent chunk. Defaults to config.batch_chunk_size (100). |
Concurrent within each chunk; chunks are sequential. Individual item failures do not abort the batch.
documents = {
"doc:001": b"content one",
"doc:002": b"content two",
"doc:003": b"content three",
}
batch = await pm.batch_store(documents, tenant, category=DataCategory.DOCUMENT)
print(f"{batch.succeeded}/{batch.total} stored in {batch.latency_ms:.1f}ms")batch_retrieve
Retrieve multiple keys concurrently from a single backend.
async def batch_retrieve(
self,
keys: List[str],
tenant: TenantContext,
category: DataCategory = DataCategory.DOCUMENT,
) -> BatchResultbatch = await pm.batch_retrieve(["doc:001", "doc:002", "doc:003"], tenant)
for result in batch.results:
if result.success:
process(result.key, result.data)Version History
get_version_history
Return the version chain for a key.
def get_version_history(
self,
key: str,
tenant: TenantContext,
limit: int = 20,
) -> List[VersionRecord]Returns up to limit VersionRecord objects in reverse-chronological order (latest first). The in-memory VersionManager retains the last config.max_versions (default: 50) versions per key.
VersionRecord fields:
| Field | Type | Description |
|---|---|---|
version_id |
str |
UUID for the version |
version_num |
int |
Monotonically increasing per key (1, 2, 3, …) |
checksum |
str |
SHA-256 of the stored data |
parent_id |
Optional[str] |
version_id of the preceding version |
created_at |
datetime |
Creation timestamp |
created_by |
Optional[str] |
tenant.user_id at write time |
size_bytes |
int |
Serialised size of the value |
tags |
Dict[str, str] |
Tags supplied at write time |
history = pm.get_version_history("doc:001", tenant, limit=5)
for v in history:
print(f"v{v.version_num} {v.version_id[:8]} "
f"{v.created_at.isoformat()} {v.size_bytes}B")rollback_to_version
Restore a key to a specific historical version.
async def rollback_to_version(
self,
key: str,
tenant: TenantContext,
version_id: str,
category: DataCategory = DataCategory.DOCUMENT,
) -> PersistenceResultRequires WRITE permission. Looks up the raw snapshot bytes for version_id in VersionManager, then calls store() with encrypt=False (the snapshot is already in its original form). Returns PersistenceResult.fail if version_id is not found in history (pruned beyond max_versions).
result = await pm.rollback_to_version("doc:001", tenant,
version_id="3f2a1b9c-...")
print(f"Rolled back to v{result.version.version_num}")Backup
create_backup
Create a point-in-time snapshot of all keys for a tenant on a specific backend.
async def create_backup(
self,
tenant: TenantContext,
storage_type: StorageType = StorageType.RELATIONAL,
prefix: str = "",
tags: Optional[Dict[str, str]] = None,
) -> Snapshot| Parameter | Type | Required | Description |
|---|---|---|---|
tenant |
TenantContext |
Yes | Requires BACKUP permission (role: backup_op or admin). |
storage_type |
StorageType |
No | Which backend to snapshot (default: RELATIONAL). |
prefix |
str |
No | Only snapshot keys with this prefix. "" = all keys. |
tags |
Dict[str, str] |
No | Metadata tags attached to the snapshot manifest. |
Returns: Snapshot with fields: snapshot_id, tenant_id, namespace, created_at, key_count, size_bytes, storage_path, storage_type, tags.
The snapshot is written to {config.backup_base_path}/{snapshot_id}/data.pkl with a manifest.json sidecar.
snap = await pm.create_backup(
tenant=tenant,
storage_type=StorageType.RELATIONAL,
tags={"reason": "pre-migration", "env": "prod"},
)
print(f"Snapshot {snap.snapshot_id}: {snap.key_count} keys, {snap.size_bytes} bytes")restore_backup
Restore a snapshot into a backend.
async def restore_backup(
self,
snapshot_id: str,
tenant: TenantContext,
storage_type: StorageType = StorageType.RELATIONAL,
overwrite: bool = True,
) -> BatchResultRequires RESTORE permission. If overwrite=False, only keys not already present are written.
batch = await pm.restore_backup(snap.snapshot_id, tenant, overwrite=True)
print(f"Restored {batch.succeeded} keys")list_backups
def list_backups(self, tenant_id: Optional[str] = None) -> List[Snapshot]Returns all known snapshots, sorted newest-first. Filter by tenant_id if provided.
Transactions
transaction (context manager)
Atomic multi-operation transaction across any combination of backends.
@asynccontextmanager
async def transaction(self) -> AsyncIterator[str]Usage:
async with pm.transaction() as txn_id:
await pm.txn_set(txn_id, "key1", value1, tenant, "kv")
await pm.txn_set(txn_id, "key2", value2, tenant, "db")
await pm.txn_delete(txn_id, "old_key", tenant, "kv")
# All operations committed atomically on exit.
# On any exception, compensating deletes are executed in reverse order.On exception, the context manager calls TransactionManager.rollback() which executes compensating operations (delete for each successful set) in reverse order.
Transaction rollback limitation: If a staged operation is a
delete, its data cannot be recovered by compensating operations (best-effort only). Avoid usingtxn_deleteinside sensitive transactions where full atomicity is required; use atxn_setwith a sentinel value instead.
txn_set
Stage a set operation within an open transaction.
async def txn_set(
self,
txn_id: str,
key: str,
value: Any,
tenant: TenantContext,
backend_name: str, # "kv" | "vector" | "db" | "object"
metadata: Optional[StorageMetadata] = None,
) -> Nonetxn_delete
Stage a delete operation within an open transaction.
async def txn_delete(
self,
txn_id: str,
key: str,
tenant: TenantContext,
backend_name: str,
) -> NoneBackend name mapping:
| Name | Backend |
|---|---|
"kv" |
KeyValueStorage |
"vector" |
VectorStorage |
"db" |
DatabaseStorage |
"object" |
ObjectStorage |
ACL & Audit
set_access_policy
Set a fine-grained access policy for a tenant namespace.
def set_access_policy(self, policy: AccessPolicy) -> NoneSee Section 7 for full policy configuration.
get_audit_log
Retrieve the in-process audit log.
def get_audit_log(
self,
tenant_id: Optional[str] = None,
limit: int = 1000,
) -> List[Dict[str, Any]]Returns the last limit audit entries, optionally filtered by tenant_id. Each entry is a dict with keys: ts, tenant_id, namespace, user_id, op, category, granted, reason.
Routing Customization
add_routing_rule
Register a custom routing rule at runtime.
def add_routing_rule(self, rule: RoutingRule) -> NoneSee Section 5.2 for rule configuration.
Metrics
get_metrics
Return per-backend operation counters and latency statistics.
def get_metrics(self) -> Dict[str, Any]m = pm.get_metrics()
# {
# "key_value": {"reads": 420, "writes": 85, "deletes": 12,
# "errors": 0, "latency_total_ms": 12400.0,
# "avg_latency_ms": 25.3},
# "vector": {...},
# "relational": {...},
# "object": {...},
# "lifecycle_stats": {"sweeps": 14, "evicted": 3, "promoted": 0},
# }estimate_storage_cost
Calculate relative storage cost for capacity planning.
def estimate_storage_cost(
self,
tier: DataTier,
size_gb: float,
months: float = 1.0,
) -> floatReturns a relative cost unit (HOT = 1.0 per GB/month). Use to compare tier migration savings.
hot_cost = pm.estimate_storage_cost(DataTier.HOT, size_gb=100, months=12)
cold_cost = pm.estimate_storage_cost(DataTier.COLD, size_gb=100, months=12)
savings_pct = (hot_cost - cold_cost) / hot_cost * 100
print(f"Moving 100GB from HOT to COLD saves {savings_pct:.0f}%")
# Moving 100GB from HOT to COLD saves 95%5.2 StorageRouter
Evaluates a prioritised rule list to select the backend for each operation.
from fennec_memory.persistence import StorageRouter, RoutingRule, DataCategory, StorageType, DataTier
router = StorageRouter()
storage_type, tier = router.route(DataCategory.CACHE)
# → (StorageType.KEY_VALUE, DataTier.HOT)Default Rules (in priority order)
| Priority | Rule Name | Condition | Target |
|---|---|---|---|
| 10 | embedding_to_vector |
category == EMBEDDING |
VECTOR |
| 20 | document_to_object |
category == DOCUMENT or size > 1 MB |
OBJECT |
| 30 | cache_to_kv |
category == CACHE |
KEY_VALUE |
| 40 | metadata_to_db |
category in {METADATA, AUDIT, CHECKPOINT} |
RELATIONAL |
| 999 | default_kv |
catch-all | KEY_VALUE |
RoutingRule
@dataclass
class RoutingRule:
name: str
priority: int # lower = higher priority
condition: Callable[[DataCategory, int, Dict[str, Any]], bool]
target: StorageType
tier_hint: DataTier = DataTier.HOT
description: str = ""add_rule / route
def add_rule(self, rule: RoutingRule) -> None
def route(self, category: DataCategory, size_bytes: int = 0,
extra: Optional[Dict[str, Any]] = None) -> Tuple[StorageType, DataTier]
def route_with_cost(self, category: DataCategory, size_bytes: int,
access_frequency: float = 1.0) -> Tuple[StorageType, DataTier]route_with_cost selects the tier based on access_frequency (reads/hour): >100 → HOT, >10 → WARM, >0.1 → COLD, else → FROZEN.
# Add a rule that sends checkpoint data > 10MB to object storage
pm.add_routing_rule(RoutingRule(
name="large_checkpoint_to_object",
priority=5,
condition=lambda cat, size, _: cat == DataCategory.CHECKPOINT and size > 10_000_000,
target=StorageType.OBJECT,
tier_hint=DataTier.WARM,
description="Large checkpoints go to object storage",
))5.3 VersionManager
Manages in-memory version history per scoped key.
from fennec_memory.persistence import VersionManager, VersionRecord| Method | Signature | Description |
|---|---|---|
record |
(scoped_key, data, created_by, tags) -> DataVersion |
Create and persist a new version. Prunes oldest if over max_versions. |
get_history |
(scoped_key, limit) -> List[VersionRecord] |
Latest-first version list. |
get_snapshot |
(scoped_key, version_id) -> Optional[bytes] |
Raw bytes for a specific version. |
rollback |
(scoped_key, version_id) -> Optional[bytes] |
Alias for get_snapshot. |
latest_version |
(scoped_key) -> Optional[VersionRecord] |
Most recent VersionRecord. |
diff_versions |
(scoped_key, v1_id, v2_id) -> Dict |
Size delta and equality check between two versions. |
VersionRecord fields: version_id, version_num, checksum (SHA-256), parent_id, created_at, created_by, size_bytes, tags.
diff = pm._versions.diff_versions(
tenant.scoped_key("doc:001"), old_version_id, new_version_id
)
print(f"Size delta: {diff['size_delta']} bytes, identical: {diff['same']}")5.4 BackupManager
Point-in-time snapshot management.
from fennec_memory.persistence import BackupManager, Snapshot| Method | Signature | Description |
|---|---|---|
create_snapshot |
(backend, tenant, storage_type, prefix, tags) -> Snapshot |
Capture all matching keys into a pickle file + manifest. |
restore_snapshot |
(snapshot_id, backend, tenant, overwrite) -> BatchResult |
Restore snapshot data into a backend. |
list_snapshots |
(tenant_id=None) -> List[Snapshot] |
All snapshots, newest-first. |
delete_snapshot |
(snapshot_id) -> bool |
Remove snapshot files from disk. |
Snapshot fields: snapshot_id, tenant_id, namespace, created_at, key_count, size_bytes, storage_path, storage_type, tags.
5.5 TransactionManager
Cross-backend 2-phase-commit-lite.
from fennec_memory.persistence import TransactionManager| Method | Signature | Description |
|---|---|---|
begin |
() -> str |
Open a new transaction; returns txn_id. |
add_operation |
(txn_id, operation_dict) -> None |
Stage one operation. |
commit |
(txn_id) -> bool |
Execute all staged ops. On partial failure, runs compensating deletes in reverse. Returns True on full success. |
rollback |
(txn_id) -> bool |
Mark transaction as rolled back without executing. |
Use PersistenceManager.transaction() (the async context manager) rather than calling these methods directly.
5.6 LifecycleManager
Background data lifecycle enforcement.
from fennec_memory.persistence import LifecycleManager| Method | Signature | Description |
|---|---|---|
start |
() -> None |
Launch the background sweep task. |
stop |
() -> None |
Signal the sweep loop to exit. |
estimate_cost |
(tier, size_gb, months) -> float |
Relative storage cost for planning. |
recommend_tier |
(access_count, age_days, size_bytes) -> DataTier |
Recommend a tier based on access patterns. |
get_stats |
() -> Dict[str, Any] |
Returns {"sweeps": N, "evicted": N, "promoted": N}. |
Sweep interval is set via config.lifecycle_sweep_secs (default: 60 seconds). Each sweep:
- Iterates
KeyValueStorage._meta_indexfor expired entries and deletes them. - Increments
stats["evicted"]for each deleted entry.
Tier recommendation thresholds (recommend_tier):
| Access Rate | Recommended Tier |
|---|---|
| > 10 accesses/day | HOT |
| > 1 access/day | WARM |
| > 0.1 accesses/day | COLD |
| ≤ 0.1 accesses/day | FROZEN |
recommendation = pm._lifecycle.recommend_tier(
access_count=30, age_days=10, size_bytes=500_000
)
# → DataTier.WARM (3 reads/day)5.7 Storage Backends
All four backends implement BasePersistence and share the same method signatures. They can be used directly for advanced scenarios or as replaceable components.
from fennec_memory.persistence import KeyValueStorage, VectorStorage, DatabaseStorage, ObjectStorageBasePersistence — Shared Interface
Every backend implements:
| Method | Description |
|---|---|
connect() |
Establish connection(s). |
disconnect() |
Gracefully close all connections. |
health_check() -> bool |
Reachability check. |
get(key, tenant) -> PersistenceResult |
Retrieve with TTL check and access tracking. |
set(key, value, tenant, metadata) -> PersistenceResult |
Upsert with versioning. |
delete(key, tenant) -> PersistenceResult |
Remove key. |
exists(key, tenant) -> bool |
Check existence without loading data. |
list_keys(prefix, tenant, limit, offset) -> List[str] |
Paginated key enumeration. |
batch_get(keys, tenant) -> BatchResult |
Concurrent multi-get. |
batch_set(items, tenant, metadata) -> BatchResult |
Concurrent multi-set. |
batch_delete(keys, tenant) -> BatchResult |
Concurrent multi-delete. |
expire(key, tenant, ttl_seconds) -> bool |
Set/update TTL (KV only). |
get_ttl(key, tenant) -> Optional[int] |
Remaining TTL in seconds (KV only). |
get_metrics() -> Dict[str, Any] |
Read/write/delete counts and avg latency. |
VectorStorage-Specific
async def search(
self,
query_vector: np.ndarray,
tenant: TenantContext,
k: int = 10,
filter_meta: Optional[Dict[str, Any]] = None,
) -> List[Tuple[str, float, Dict[str, Any]]]Each tenant namespace gets an isolated VectorIndex instance. The index uses L2-normalised cosine similarity.
DatabaseStorage-Specific
async def get_version_history(self, key, tenant, limit=20) -> List[Dict[str, Any]]
async def restore_version(self, key, tenant, version_id) -> PersistenceResultDatabaseStorage persists every version snapshot to a version_history table alongside the main persistence_store table.
6. Configuration System
PersistenceManagerConfig
from fennec_memory.persistence import PersistenceManagerConfig, StorageConfig, StorageType
config = PersistenceManagerConfig(
# Backend configs
kv_config=StorageConfig(storage_type=StorageType.KEY_VALUE),
vector_config=StorageConfig(
storage_type=StorageType.VECTOR,
extra={"embedding_dim": 1536},
),
db_config=StorageConfig(
storage_type=StorageType.RELATIONAL,
extra={"db_path": "/var/data/rag_metadata.db"},
),
object_config=StorageConfig(
storage_type=StorageType.OBJECT,
extra={"base_path": "/var/data/rag_objects"},
),
# Security
master_key=bytes.fromhex(os.environ["MASTER_KEY_HEX"]), # 32 bytes exactly
enable_encryption=True,
# Lifecycle
max_versions=50,
lifecycle_sweep_secs=60,
backup_base_path="/var/data/rag_backups",
# Performance
default_cache_ttl_secs=3600,
batch_chunk_size=100,
)Full field reference:
| Field | Type | Default | Description |
|---|---|---|---|
kv_config |
StorageConfig |
in-memory defaults | Key-value backend config. |
vector_config |
StorageConfig |
embedding_dim=1536 |
Vector backend config. |
db_config |
StorageConfig |
db_path=":memory:" |
Relational backend config. |
object_config |
StorageConfig |
base_path="/tmp/rag_objects" |
Object storage config. |
master_key |
bytes |
dev key (32B) | Master encryption key. Must be exactly 32 bytes. Read from environment in production. |
enable_encryption |
bool |
True |
Global encryption toggle. |
max_versions |
int |
50 |
Maximum versions retained per key in VersionManager. |
lifecycle_sweep_secs |
int |
60 |
Background sweep interval. |
backup_base_path |
str |
"/tmp/rag_backups" |
Snapshot storage directory. |
default_cache_ttl_secs |
int |
3600 |
TTL applied to DataCategory.CACHE entries when no TTL is specified. |
batch_chunk_size |
int |
100 |
Items per concurrent chunk in batch_store. |
StorageConfig
@dataclass
class StorageConfig:
storage_type: StorageType
host: str = "localhost"
port: int = 0
database: str = ""
username: str = ""
password: str = ""
ssl: bool = False
pool_size: int = 10
max_overflow: int = 20
connect_timeout: int = 5
command_timeout: int = 30
retry_attempts: int = 3
retry_delay_ms: int = 100
extra: Dict[str, Any] = field(default_factory=dict)extra is backend-specific:
VectorStorage:{"embedding_dim": 1536}DatabaseStorage:{"db_path": "/path/to/db.sqlite"}ObjectStorage:{"base_path": "/path/to/objects"}
Environment Variable Reference
| Variable | Used By | Description |
|---|---|---|
PERSISTENCE_MASTER_KEY |
EncryptionEngine |
Base64-encoded 32-byte master key. If absent, falls back to a deterministic dev key. Never use the dev key in production. |
Generating a Master Key
from fennec_memory.persistence import EncryptionEngine
key_b64 = EncryptionEngine.generate_master_key()
print(key_b64) # store in Vault / KMS7. Security Model
7.1 Encryption — AES-256-GCM
EncryptionEngine uses AES-256-GCM (authenticated encryption) with a per-tenant derived key.
Key derivation:
tenant_key = PBKDF2-SHA256(master_key, salt=tenant_id, iterations=100_000, length=32)Derived keys are cached in memory to avoid re-deriving on every operation.
Blob format:
[1 byte: key_version] [12 bytes: GCM nonce] [N bytes: ciphertext + 16-byte GCM tag]The GCM tag authenticates the ciphertext. Any bit-flip or tampered byte causes decrypt() to raise an exception before returning data. The version byte enables future key rotation without re-encrypting all blobs.
np.ndarray values (embeddings) are never encrypted — they must be stored as raw floats for the vector index to function.
from fennec_memory.persistence import EncryptionEngine
import base64
# Generate a new master key
master_key_b64 = EncryptionEngine.generate_master_key()
print(f"Save this securely: {master_key_b64}")
# Direct usage
engine = EncryptionEngine(master_key=base64.b64decode(master_key_b64))
blob = engine.encrypt(b"sensitive payload", tenant_id="acme")
data = engine.decrypt(blob, tenant_id="acme")
# With Additional Authenticated Data (AAD)
blob = engine.encrypt(b"payload", tenant_id="acme", aad=b"context")
data = engine.decrypt(blob, tenant_id="acme", aad=b"context")
# JSON convenience wrappers
blob = engine.encrypt_json({"key": "value"}, tenant_id="acme")
obj = engine.decrypt_json(blob, tenant_id="acme")
7.2 Role-Based Access Control
Built-in roles and their permissions:
| Role | READ | WRITE | DELETE | LIST | BACKUP | RESTORE | ADMIN |
|---|---|---|---|---|---|---|---|
reader |
✓ | ✓ | |||||
writer |
✓ | ✓ | ✓ | ||||
editor |
✓ | ✓ | ✓ | ✓ | |||
backup_op |
✓ | ✓ | ✓ | ✓ | |||
admin |
✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
Assign roles via TenantContext.roles:
tenant = TenantContext(
tenant_id="acme",
namespace="prod",
user_id="alice",
roles=["editor"], # can read, write, delete, list
)Multiple roles are combined — a user with ["reader", "backup_op"] gets the union of both permission sets.
Open-by-default: If
TenantContext.rolesis empty and noAccessPolicyis registered for the tenant, the system permits all operations by default. Enforcement activates as soon as at least one role is assigned to anyTenantContextor anAccessPolicyis set for the tenant. Explicitly assign roles or set a policy for all tenants in production.
7.3 Access Policies
Fine-grained policies override role-level permissions:
from fennec_memory.persistence import AccessPolicy, Permission, DataCategory
policy = AccessPolicy(
tenant_id="acme",
namespace="prod",
allowed_ops={Permission.READ, Permission.WRITE, Permission.LIST},
denied_ops={Permission.DELETE}, # deny delete even for editors
data_categories={DataCategory.DOCUMENT, DataCategory.CACHE},
ip_whitelist=["10.0.0.0/8"], # restrict to internal network
rate_limit=100, # max 100 operations per second
)
pm.set_access_policy(policy)AccessPolicy fields:
| Field | Type | Default | Description |
|---|---|---|---|
tenant_id |
str |
Required | Target tenant. |
namespace |
str |
Required | Target namespace. |
allowed_ops |
Set[Permission] |
Required | Permitted operations. |
denied_ops |
Set[Permission] |
set() |
Explicitly denied operations (takes precedence over role grants). |
data_categories |
Set[DataCategory] |
all categories | Restrict policy to specific data categories. |
ip_whitelist |
List[str] |
[] |
If non-empty, only requests from these CIDR ranges are permitted. |
rate_limit |
Optional[int] |
None |
Maximum operations per second via token-bucket limiter. |
Authorization decision order:
- Resolve effective permissions from
tenant.roles. - Check
op not in effective_perms→ deny with reason"insufficient_role". - Check
AccessPolicy.permits(op, category)if a policy exists → deny with reason"policy_denied". - Check rate limiter → deny with reason
"rate_limited". - Grant and record to audit log.
7.4 Rate Limiting
RateLimiter uses a token-bucket algorithm. The bucket refills at ops_per_second tokens per second and allows bursts up to capacity operations.
from fennec_memory.persistence import RateLimiter
limiter = RateLimiter(ops_per_second=100)
if limiter.allow():
# proceed
passRate limiters are instantiated automatically when a policy with rate_limit is registered.
7.5 Input Validation (DataSanitizer)
DataSanitizer is applied before every store() and retrieve() call. It can also be used directly:
from fennec_memory.persistence import DataSanitizer
# Validate and clean a key (raises ValueError on violation)
clean_key = DataSanitizer.validate_key("my::key")
# Validate value size (raises ValueError if > 100 MB)
DataSanitizer.validate_value_size(serialised_bytes)
# Validate tenant context (raises ValueError on invalid tenant_id or namespace)
DataSanitizer.validate_tenant(tenant)| Validation | Rule |
|---|---|
| Key type | Must be a non-empty str |
| Key length | ≤ 512 characters |
| Key characters | No \x00, \r, \n |
| Key patterns | No ../, ..\, \x00 (path traversal prevention) |
| Value size | ≤ 100 MB |
| Tenant context | tenant_id and namespace must be non-empty strings |
| Tenant characters | No .., /, \, \x00 in tenant_id or namespace |
Keys are stripped of leading/trailing whitespace before use.
7.6 Audit Log
Every access control decision (granted or denied) is recorded with:
{
"ts": "2026-04-21T09:30:00.123+00:00",
"tenant_id": "acme",
"namespace": "prod",
"user_id": "alice",
"op": "write",
"category": "document",
"granted": True,
"reason": "ok", # or "insufficient_role" | "policy_denied" | "rate_limited"
}Retrieve via pm.get_audit_log(tenant_id="acme", limit=500).
In production, forward the audit log to an append-only SIEM or database table by replacing or wrapping AccessControlManager._record_audit.
8. Storage Backends — Deep Dive
KeyValueStorage (in-memory / Redis shape)
Current implementation: Async in-memory dict with a background TTL reaper task (1-second sweep). Thread-safe via asyncio.Lock.
Production swap: Replace _store dict operations with aioredis calls. The get, set, delete, expire, get_ttl, and batch methods map 1:1 to Redis commands.
Key features:
- TTL enforcement via a background coroutine that sweeps
_ttl_indexevery second. get_ttl()returns remaining seconds.expire()updates an existing key's TTL without touching its value.- Metadata indexed in
_meta_indexfor lifecycle management.
When to use: API response caching, session data, short-lived computation results, rate-limit counters.
VectorStorage (NumPy / FAISS shape)
Current implementation: Pure-NumPy cosine similarity index (VectorIndex). L2-normalises all vectors on insert. Searches via matrix multiplication (O(n·d)).
Production swap: Replace VectorIndex with faiss.IndexFlatIP for CPU, faiss.IndexIVFFlat for large-scale approximate search, or pgvector for PostgreSQL-backed persistence.
Key features:
- One
VectorIndexinstance pertenant.storage_prefix()— strict isolation. add()is idempotent: re-adding an existingvec_idupdates in place.filter_metapost-filters results by exact metadata match before returningkresults (oversamples 3× internally).- Embedding dimension must match
config.vector_config.extra["embedding_dim"].
When to use: Semantic search over document chunks, nearest-neighbour retrieval, duplicate detection.
Dimension mismatch: Both store_embedding() and VectorStorage.set() return PersistenceResult.fail(...) on dimension mismatch — no exception propagates.
DatabaseStorage (aiosqlite / asyncpg shape)
Current implementation: aiosqlite with an in-process SQLite database. Supports :memory: for testing.
Production swap: Replace aiosqlite.connect with asyncpg.connect to target PostgreSQL. The SQL is standard ANSI with SQLite-specific ON CONFLICT ... DO UPDATE (PostgreSQL equivalent: ON CONFLICT ... DO UPDATE SET).
Schema:
persistence_store — primary key-value store with full metadata columns
version_history — append-only version snapshotsIndices on (tenant_id, namespace), category, and tier for efficient filtering.
Key features:
get()checks TTL by comparingcreated_at + ttl_secondsagainstNOW().- Every
set()appends a snapshot row toversion_history. get_version_history()andrestore_version()are available directly onDatabaseStorage.access_countis incremented atomically on everyget().
When to use: Structured metadata, audit records, checkpoints, version history that must survive process restarts.
ObjectStorage (local filesystem / S3 shape)
Current implementation: Local filesystem with one file per object. Metadata stored as a JSON sidecar (.meta.json).
Production swap: Replace read_bytes/write_bytes with aioboto3 S3 calls. Key naming conventions are identical.
Key features:
- Path sanitisation:
/→_,..→__in key names. - Async I/O via
loop.run_in_executor(None, ...)to avoid blocking the event loop. - TTL checked at read time from the metadata sidecar.
list_keys()lists actual files;.meta.jsonsidecars are excluded.
When to use: Large document blobs (>1 MB), file uploads, multi-part objects, audio/video assets.
Backend Comparison
| Aspect | KeyValueStorage | VectorStorage | DatabaseStorage | ObjectStorage |
|---|---|---|---|---|
| Primary use | Caching, sessions | Semantic search | Metadata, audits | Large documents |
| Persistence | None (in-memory) | None (in-memory) | SQLite / PostgreSQL | Filesystem / S3 |
| Query type | Exact key | Cosine similarity | SQL (exact + range) | Exact key |
| TTL support | Native | Via metadata | Via SQL check | Via metadata sidecar |
| Max value size | 100 MB (validated) | embedding_dim floats |
100 MB (validated) | Unlimited |
| Thread-safety | asyncio.Lock |
asyncio.Lock |
aiosqlite |
run_in_executor |
| Version history | DataVersion only | DataVersion only | Full SQL snapshots | DataVersion only |
| Production swap | aioredis | FAISS / pgvector | asyncpg (PostgreSQL) | aioboto3 (S3) |
9. Observability & Metrics
Per-Backend Metrics
pm.get_metrics() aggregates from all four backends and the lifecycle manager:
metrics = pm.get_metrics()Each backend section contains:
| Metric | Description |
|---|---|
reads |
Total get() calls (including misses). |
writes |
Total set() calls. |
deletes |
Total delete() calls. |
errors |
Operations that returned success=False. |
latency_total_ms |
Cumulative latency across all operations. |
avg_latency_ms |
latency_total_ms / (reads + writes + deletes). |
for name in ["key_value", "vector", "relational", "object"]:
avg = metrics[name]["avg_latency_ms"]
if avg > 100:
alert(f"High latency on {name}: {avg:.1f}ms")Lifecycle Stats
metrics["lifecycle_stats"]
# {"sweeps": 14, "evicted": 23, "promoted": 0}| Key | Description |
|---|---|
sweeps |
Number of completed lifecycle sweeps. |
evicted |
Entries removed due to TTL expiry. |
promoted |
(Reserved for future tier promotion logic.) |
Audit Log Analytics
log = pm.get_audit_log(tenant_id="acme", limit=10000)
# Compute denial rate
denied = [e for e in log if not e["granted"]]
denial_rate = len(denied) / len(log) if log else 0.0
# Top denied operations
from collections import Counter
top_denied = Counter(e["op"] for e in denied).most_common(5)
# Rate limiting events
rate_limited = [e for e in log if e["reason"] == "rate_limited"]Health Monitoring
health = await pm.health_check()
# {"key_value": True, "vector": True, "relational": True, "object": True}
if not all(health.values()):
unhealthy = [k for k, v in health.items() if not v]
alert(f"Unhealthy backends: {unhealthy}")Batch Success Rate
batch = await pm.batch_store(documents, tenant)
print(f"Success rate: {batch.success_rate:.1%}")
print(f"Failed keys: {list(batch.errors.keys())}")Storage Cost Estimation
for tier in [DataTier.HOT, DataTier.WARM, DataTier.COLD, DataTier.FROZEN]:
cost = pm.estimate_storage_cost(tier, size_gb=1000, months=12)
print(f"{tier.value:10s}: {cost:8.2f} units/year")
# hot : 12000.00 units/year
# warm : 3600.00 units/year
# cold : 600.00 units/year
# frozen : 60.00 units/year10. Edge Cases & Failure Handling
Backend Connection Failure
PersistenceManager.initialize() calls all four backend.connect() methods concurrently via asyncio.gather. If any single backend fails to connect, the exception propagates to the caller — the PersistenceManager is not left in a partially initialised state.
Mitigation: Wrap PersistenceManager.create() in a retry loop with exponential backoff for transient connection errors (e.g., Redis not yet ready during container startup).
SQLite / Database Failure
DatabaseStorage wraps all reads and writes in aiosqlite coroutines. If the database file is corrupt, inaccessible, or locked, the underlying aiosqlite exception propagates through get() / set() and surfaces as an unhandled exception in the caller. health_check() will return False.
Mitigation: Use a non-:memory: db_path for any production data. Back up the SQLite file regularly. For production, migrate to PostgreSQL via asyncpg.
Vector Dimension Mismatch
store_embedding() checks vector.shape[0] against config.vector_config.extra["embedding_dim"]. On mismatch, it returns PersistenceResult.fail(key, WRITE, "Vector dim mismatch: ...") — no exception. The VectorStorage.set() method performs the same check and returns fail(...) inline.
Mitigation: Set embedding_dim in PersistenceManagerConfig.vector_config.extra to match your embedding model's output dimension before the first write.
Quota / Value Size Exceeded
DataSanitizer.validate_value_size() raises ValueError for values larger than 100 MB. This exception propagates directly from store().
Mitigation: Pre-chunk large documents at the application layer before calling store(). Use batch_store() with category=DataCategory.DOCUMENT to store chunks in parallel.
Key Expiry Race Condition
A key may expire between an exists() check and a subsequent get(). Both KeyValueStorage and DatabaseStorage perform TTL checks inside get() itself and return PersistenceResult.fail(key, READ, "Key expired") when the key has elapsed. Always check result.success rather than assuming a key is live after exists().
Corrupted or Missing Snapshot
BackupManager.restore_snapshot() loads snapshot data with pickle.loads. A corrupt data.pkl file raises pickle.UnpicklingError. If snapshot_id is not in the in-memory _snapshots dict and data.pkl does not exist on disk, restore_snapshot() returns BatchResult(total=0, succeeded=0, failed=1, errors={"restore": "Snapshot not found"}).
Mitigation: Verify snapshots after creation by calling list_backups() and checking key_count > 0. Store snapshots on durable, replicated storage in production.
Missing Tenant / Permission Denied
If TenantContext.roles does not include a role with the required permission, AccessControlManager.require_permission() raises PermissionError before any backend is contacted. The exception message includes tenant_id, the operation, and the namespace.
If tenant_id or namespace is empty or contains forbidden characters (.., /, \, \x00), DataSanitizer.validate_tenant() raises ValueError immediately.
Transaction Partial Failure
TransactionManager.commit() executes staged operations sequentially. If any operation fails, it runs compensating deletes in reverse order and marks the transaction "rolled_back". Compensating deletes that themselves fail are logged as errors but do not raise — the system records the failure and continues.
Mitigation: Always use async with pm.transaction() as txn_id: (the context manager) rather than calling begin() / commit() manually. The context manager guarantees rollback() is called on any exception. Avoid staging txn_delete operations for data that must be recoverable — compensating ops cannot restore deleted data.
Encryption Key Missing or Invalid
If PERSISTENCE_MASTER_KEY is absent from the environment and no master_key is supplied in the config, EncryptionEngine uses a deterministic development key derived from the string "dev-only-key". This key is not secret. Never store sensitive data with the dev key in any environment beyond local development.
If a retrieve() call attempts to decrypt a blob that was not encrypted (e.g., the key was stored with encrypt=False), the decrypt() call raises a cryptography exception. PersistenceManager.retrieve() silently catches this and returns the raw bytes as-is.
Async Context Without Running Event Loop
All PersistenceManager methods are async. Calling them from synchronous code requires asyncio.run():
# Correct in sync context
result = asyncio.run(pm.retrieve("key", tenant))
# Wrong — returns a coroutine, not the result
result = pm.retrieve("key", tenant) # ← missing await / asyncio.runKeyValueStorage spawns a background TTL reaper coroutine via asyncio.create_task() inside connect(). This requires an active event loop when connect() is called. Always call PersistenceManager.create() (or initialize()) from within an async function or asyncio.run().
11. Advanced Usage
Multi-Tenant Setup
import asyncio, os
from fennec_memory.persistence import (
PersistenceManager, PersistenceManagerConfig,
TenantContext, DataCategory, DataTier,
AccessPolicy, Permission, StorageConfig, StorageType,
)
async def main():
config = PersistenceManagerConfig(
db_config=StorageConfig(
storage_type=StorageType.RELATIONAL,
extra={"db_path": "/var/data/rag.db"},
),
master_key=bytes.fromhex(os.environ["MASTER_KEY_HEX"]),
enable_encryption=True,
)
ppm=await PersistenceManager.create(config)
async with ppm as pm:
# Tenant A — full access
tenant_a = TenantContext("acme", "prod", roles=["admin"])
# Tenant B — read-only, rate-limited to 50 ops/sec
tenant_b = TenantContext("globalcorp", "prod", roles=["reader"])
pm.set_access_policy(AccessPolicy(
tenant_id="globalcorp",
namespace="prod",
allowed_ops={Permission.READ, Permission.LIST},
denied_ops={Permission.WRITE, Permission.DELETE},
rate_limit=50,
))
# Tenant A stores a document — Tenant B cannot see it
await pm.store("confidential:q4_plan", b"...", tenant_a,
category=DataCategory.DOCUMENT)
# Tenant B trying to write raises PermissionError
try:
await pm.store("any_key", b"data", tenant_b,
category=DataCategory.DOCUMENT)
except PermissionError as e:
print(f"Correctly denied: {e}")
asyncio.run(main())Embedding Storage and Retrieval Pipeline
async def rag_ingest(pm, tenant, docs, embed_fn):
"""Ingest documents with embeddings."""
for doc_id, text in docs.items():
# 1. Store raw document
await pm.store(f"doc:{doc_id}", text.encode(), tenant,
category=DataCategory.DOCUMENT)
# 2. Chunk and embed
chunks = chunk_text(text, size=512)
for i, chunk in enumerate(chunks):
vec = embed_fn(chunk)
await pm.store_embedding(
key=f"chunk:{doc_id}:{i}",
vector=vec,
tenant=tenant,
meta={"doc_id": doc_id, "chunk_idx": i, "text": chunk[:100]},
)
async def rag_retrieve(pm, tenant, query, embed_fn, k=5):
"""Retrieve relevant chunks for a query."""
query_vec = embed_fn(query)
results = await pm.vector_search(query_vec, tenant, k=k)
context_parts = []
for key, score, meta in results:
context_parts.append({
"doc_id": meta["doc_id"],
"chunk": meta["text"],
"score": score,
})
return context_partsAtomic Cross-Backend Transactions
async def update_doc_and_meta(pm, tenant, doc_id, new_content, new_meta):
"""Update document bytes and metadata atomically."""
async with pm.transaction() as txn_id:
await pm.txn_set(
txn_id,
key=f"doc:{doc_id}",
value=new_content,
tenant=tenant,
backend_name="object",
)
await pm.txn_set(
txn_id,
key=f"meta:{doc_id}",
value=new_meta,
tenant=tenant,
backend_name="db",
)
# Committed — or both rolled back on exceptionVersion Rollback Workflow
async def safe_update(pm, tenant, key, new_value, category):
"""Update a key, preserving the ability to roll back."""
# Check current version before modifying
history = pm.get_version_history(key, tenant, limit=1)
old_version_id = history[0].version_id if history else None
# Apply new value
result = await pm.store(key, new_value, tenant, category=category)
# On downstream error, roll back
if validation_fails(result):
if old_version_id:
await pm.rollback_to_version(key, tenant, old_version_id, category)
else:
await pm.delete(key, tenant, category)Custom Routing for Compliance
from fennec_memory.persistence import persistenceManager,RoutingRule, DataCategory, StorageType, DataTier
pm=await PersistenceManager.create(config)
# Force all audit data to a dedicated relational database, never KV
pm.add_routing_rule(RoutingRule(
name="audit_always_to_db",
priority=1, # highest priority — overrides all defaults
condition=lambda cat, _size, _ext: cat == DataCategory.AUDIT,
target=StorageType.RELATIONAL,
tier_hint=DataTier.COLD,
description="Compliance: audit records must persist in the relational DB",
))End-to-End RAG Pipeline Example
import asyncio
import numpy as np
from fennec_memory.persistence import PersistenceManager, PersistenceManagerConfig
from fennec_memory.persistence import (
TenantContext, DataCategory, DataTier,
StorageConfig, StorageType,
)
from fennec_memory.persistence import AccessPolicy, Permission
from fenenc_memory.persistence import RoutingRule
async def main():
config = PersistenceManagerConfig(
vector_config=StorageConfig(
storage_type=StorageType.VECTOR,
extra={"embedding_dim": 1536},
),
db_config=StorageConfig(
storage_type=StorageType.RELATIONAL,
extra={"db_path": "./rag_meta.db"},
),
object_config=StorageConfig(
storage_type=StorageType.OBJECT,
extra={"base_path": "./rag_docs"},
),
enable_encryption=True,
master_key=b"my-32-byte-production-key-here!",
backup_base_path="./backups",
)
persistence = await PersistenceManager.create(config)
async with persistence as pm:
# Access policy
pm.set_access_policy(AccessPolicy(
tenant_id="acme",
namespace="prod",
allowed_ops={Permission.READ, Permission.WRITE,
Permission.LIST, Permission.BACKUP},
rate_limit=500,
))
tenant = TenantContext(
tenant_id="acme",
namespace="prod",
user_id="pipeline@acme.com",
roles=["writer"],
)
# Store document
doc_result = await pm.store(
key="doc:product-manual-v2",
value=b"<product manual content...>",
tenant=tenant,
category=DataCategory.DOCUMENT,
tags={"type": "manual", "product": "widget-x"},
)
print(f"Document stored: v{doc_result.version.version_num}")
# Store embedding
embedding = np.random.randn(1536).astype(np.float32)
await pm.store_embedding(
key="emb:product-manual-v2",
vector=embedding,
tenant=tenant,
meta={"doc_key": "doc:product-manual-v2", "chunk": 0},
)
# Semantic search
query_vec = np.random.randn(1536).astype(np.float32)
search_results = await pm.vector_search(
query_vector=query_vec, tenant=tenant, k=3,
)
for key, score, meta in search_results:
print(f" [{score:.3f}] {key}")
# Version history
history = pm.get_version_history("doc:product-manual-v2", tenant)
print(f"Versions: {len(history)}")
# Atomic transaction
async with pm.transaction() as txn_id:
await pm.txn_set(
txn_id, "meta:ingestion:run-42",
{"status": "complete", "docs": 150}, tenant, "db",
)
await pm.txn_delete(txn_id, "meta:ingestion:run-41", tenant, "db")
# Backup
backup_tenant = TenantContext(
tenant_id="acme", namespace="prod", roles=["backup_op"],
)
snap = await pm.create_backup(
tenant=backup_tenant,
storage_type=StorageType.RELATIONAL,
tags={"type": "scheduled"},
)
print(f"Backup: {snap.key_count} keys, {snap.size_bytes} bytes")
# Health and metrics
health = await pm.health_check()
metrics = pm.get_metrics()
print(f"Health: {health}")
print(f"KV writes: {metrics['key_value']['writes']}")
# Cost estimation
hot_cost = pm.estimate_storage_cost(DataTier.HOT, size_gb=1.0)
cold_cost = pm.estimate_storage_cost(DataTier.COLD, size_gb=1.0)
print(f"Saving by moving to COLD: {(1 - cold_cost / hot_cost):.0%}")
asyncio.run(main())Production Deployment Notes
Encryption key management: Never hardcode master_key. Store it in HashiCorp Vault, AWS Secrets Manager, or GCP Secret Manager. Retrieve at startup and inject as bytes:
import base64
master_key = base64.b64decode(vault_client.read_secret("persistence_master_key"))
config = PersistenceManagerConfig(master_key=master_key, enable_encryption=True)Backend swap procedure: To switch from SQLite to PostgreSQL, update db_config to point at the PostgreSQL host and port, set username and password, and replace aiosqlite.connect in DatabaseStorage with an asyncpg connection pool. No changes to PersistenceManager or callers are needed.
Concurrency: PersistenceManager is designed for a single-process async application. For multi-process deployments, replace KeyValueStorage with a Redis-backed implementation so all processes share the same cache state. DatabaseStorage and ObjectStorage can share a single PostgreSQL/S3 endpoint across processes natively.
Lifecycle sweep interval: For production deployments with high cache churn, lower lifecycle_sweep_secs to 10–30 seconds. For deployments with predominantly long-TTL data, 300 seconds is sufficient and reduces CPU overhead.
Backup strategy: Schedule create_backup() at least daily for each active tenant on DatabaseStorage. Offload snapshot files to object storage (S3, GCS) immediately after creation. Retain 30 days of snapshots minimum for compliance. Automate delete_snapshot() for snapshots older than your retention policy.
Audit log overflow: The in-process _audit_log list grows indefinitely. In production, override AccessControlManager._record_audit() to write directly to an append-only database table or ship to a SIEM (Splunk, Datadog, OpenSearch) using an async log handler. The default implementation is suitable only for development and short-lived services.
memory/persistence_module_docs.md