Fennec Logo Fennec
Fennec Memory memory/persistence_module_docs.md

Fennec Persistence Module

Enterprise-Grade Multi-Backend Persistence Layer for RAG Applications


Table of Contents

  1. Overview
  2. System Architecture
  3. Core Concepts
  4. Quick Start Guide
  5. Public API Reference
  6. Configuration System
  7. Security Model
  8. Storage Backends — Deep Dive
  9. Observability & Metrics
  10. Edge Cases & Failure Handling
  11. Advanced Usage

1. Overview

The Fennec Persistence Module is a production-grade, multi-tenant persistence layer designed for RAG (Retrieval-Augmented Generation) applications and LLM pipelines. It unifies four fundamentally different storage backends — key-value, vector, relational, and object storage — behind a single coherent API, with automatic routing, encryption, versioning, access control, and lifecycle management built in.

Why It Exists

RAG systems require at least three distinct storage paradigms simultaneously: a fast key-value store for caching, a vector index for semantic search, and a relational database for structured metadata. Managing each backend independently creates duplication, inconsistent security boundaries, and fragile operational procedures. The Fennec Persistence Module solves this by providing a single entry point — PersistenceManager — that makes the right storage decision automatically based on data type and size, while enforcing uniform encryption, RBAC, and audit logging across all backends.

Real-World Use Cases

Use Case How the Module Helps
LLM response caching Route DataCategory.CACHE entries to KeyValueStorage with automatic TTL enforcement
Embedding storage & retrieval Store and search DataCategory.EMBEDDING vectors with per-tenant isolation
RAG document ingestion Persist large document chunks to ObjectStorage; metadata to DatabaseStorage
Multi-tenant SaaS Complete namespace isolation per tenant; per-tenant RBAC, rate limiting, and encryption keys
Audit & compliance Append-only audit log for every READ, WRITE, DELETE, BACKUP operation
Disaster recovery Point-in-time snapshots with one-command restore
Data versioning Full version history with rollback for any stored key

2. System Architecture

Pipeline Overview

┌─────────────────────────────────────────────────────────────────────┐
│                         WRITE PATH                                  │
│                                                                     │
│  pm.store(key, value, tenant, category)                             │
│         │                                                           │
│         ▼                                                           │
│  [DataSanitizer]    ← validate key, tenant, value size             │
│         │                                                           │
│         ▼                                                           │
│  [AccessControlManager]  ← RBAC + policy check + rate limit       │
│         │                                                           │
│         ▼                                                           │
│  [StorageRouter]    ← route by DataCategory + size → StorageType   │
│         │                                                           │
│         ├── EMBEDDING  → VectorStorage  (FAISS / pgvector)         │
│         ├── DOCUMENT   → ObjectStorage  (local / S3)               │
│         ├── CACHE      → KeyValueStorage (in-memory / Redis)       │
│         └── METADATA   → DatabaseStorage (SQLite / PostgreSQL)     │
│                                                                     │
│  [EncryptionEngine]  ← AES-256-GCM, per-tenant derived key        │
│         │                                                           │
│         ▼                                                           │
│  [VersionManager]   ← SHA-256 checksum + version chain            │
│         │                                                           │
│         ▼                                                           │
│  [Backend.set()]    ← atomic write to chosen backend               │
│         │                                                           │
│         ▼                                                           │
│  [DatabaseStorage]  ← metadata mirrored to relational DB           │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│                         READ PATH                                   │
│                                                                     │
│  pm.retrieve(key, tenant, category)                                 │
│         │                                                           │
│         ▼                                                           │
│  [DataSanitizer + AccessControlManager]                             │
│         │                                                           │
│         ▼                                                           │
│  [StorageRouter]    ← same routing logic as write path             │
│         │                                                           │
│         ▼                                                           │
│  [Backend.get()]    ← TTL checked, access_count incremented        │
│         │                                                           │
│         ▼                                                           │
│  [EncryptionEngine.decrypt()]  ← only if blob is encrypted         │
│         │                                                           │
│         ▼                                                           │
│  PersistenceResult  → returned to caller                           │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│                   BACKGROUND SERVICES                               │
│                                                                     │
│  LifecycleManager                                                   │
│    ├── TTL reaper    (KeyValueStorage: 1-second sweep)             │
│    └── Lifecycle sweep (configurable interval, default 60s)        │
│                                                                     │
│  TransactionManager                                                 │
│    └── 2PC-lite: stage → execute → compensate on failure          │
└─────────────────────────────────────────────────────────────────────┘

Component Responsibilities

Component Responsibility
PersistenceManager Single entry point. Wires all subsystems. Exposes store, retrieve, delete, vector_search, batch_*, transaction, backup, and versioning APIs.
StorageRouter Evaluates data category and payload size against a prioritised rule set to select the correct StorageType and DataTier.
VersionManager Maintains an in-memory SHA-256 version chain per scoped key. Supports rollback to any historical version within max_versions.
BackupManager Creates and restores pickle-serialised point-in-time snapshots. Writes a manifest.json alongside each snapshot.
TransactionManager 2-phase-commit-lite across multiple backends. Stages operations, executes them in order, and executes compensating deletes on any failure.
LifecycleManager Periodic background sweep enforcing TTL expiry and data tier recommendations. Exposes storage cost estimates.
EncryptionEngine AES-256-GCM symmetric encryption. Derives a unique key per tenant from the master key using PBKDF2-SHA256. Produces self-describing blobs (version byte + nonce + ciphertext + tag).
AccessControlManager RBAC with per-tenant policies, operation-level deny lists, and token-bucket rate limiting. Records every decision to an in-process audit log.
DataSanitizer Input validation: key format, key length (≤512 chars), value size (≤100 MB), path traversal prevention on both keys and tenant IDs.

Design Philosophy

  • One API, four backends. All backends share the same BasePersistence contract, making them interchangeable without caller changes.
  • Automatic routing. Callers declare what they are storing (DataCategory), not where. The router decides.
  • Tenant-first isolation. Every key is scoped as {tenant_id}::{namespace}::{key}. Cross-tenant reads are architecturally impossible through the public API.
  • Defense in depth. Validation → authorisation → encryption → versioning — in that order, always.
  • Production swap paths. Every backend has a documented one-line production replacement: in-memory dict → Redis, local filesystem → S3, SQLite → PostgreSQL, NumPy index → FAISS/pgvector.

3. Core Concepts

3.1 Data Categories and Routing

DataCategory is the primary routing signal. Each value maps to a default backend:

DataCategory Default Backend Default Tier
EMBEDDING VectorStorage HOT
DOCUMENT ObjectStorage WARM
CACHE KeyValueStorage HOT
METADATA DatabaseStorage HOT
CHECKPOINT DatabaseStorage HOT
AUDIT DatabaseStorage HOT

Large payloads (>1 MB) are routed to ObjectStorage regardless of category.

3.2 Data Tiers

DataTier describes the cost/performance trade-off:

Tier Relative Cost/GB/Month Use Case
HOT 1.0× Frequently accessed data — embeddings, caches
WARM 0.3× Documents accessed a few times per day
COLD 0.05× Rarely accessed archives
FROZEN 0.005× Long-term audit retention

3.3 TenantContext

Every operation requires a TenantContext. It carries the isolation namespace and RBAC roles:

from fennec_memory.persistence import TenantContext

tenant = TenantContext(
    tenant_id="acme",       # required: org/account identifier
    namespace="prod",       # required: environment/project qualifier
    user_id="alice",        # optional: for audit trails and version authorship
    session_id="sess_001",  # optional: for session-scoped audit
    roles=["writer"],       # RBAC roles applied to this operation
)

# Derived helpers
tenant.storage_prefix()      # → "acme::prod"
tenant.scoped_key("doc:001") # → "acme::prod::doc:001"

3.4 PersistenceResult

All single-item operations return a PersistenceResult:

@dataclass
class PersistenceResult:
    success:      bool
    key:          str
    operation:    OperationType       # READ | WRITE | DELETE | UPDATE | LIST
    storage_type: Optional[StorageType]
    data:         Optional[Any]       # The retrieved value (on READ)
    metadata:     Optional[StorageMetadata]
    error:        Optional[str]       # Error message if success=False
    latency_ms:   float
    version:      Optional[DataVersion]
    from_cache:   bool

Factory methods:

PersistenceResult.ok(key, operation, data=..., latency_ms=..., ...)
PersistenceResult.fail(key, operation, error="reason")

3.5 BatchResult

All batch operations return a BatchResult:

@dataclass
class BatchResult:
    total:      int
    succeeded:  int
    failed:     int
    results:    List[PersistenceResult]
    errors:     Dict[str, str]    # key → error message for failed items
    latency_ms: float

# Computed properties
batch.success_rate  # float: succeeded / total
batch.all_succeeded # bool

3.6 Version Chain

Every store() call records a DataVersion linked to its predecessor:

@dataclass
class DataVersion:
    version_id:      str      # UUID
    version_num:     int      # monotonically increasing per key
    created_at:      datetime
    created_by:      Optional[str]   # tenant.user_id
    checksum:        Optional[str]   # SHA-256 of raw bytes
    parent_version:  Optional[str]   # previous version_id
    tags:            Dict[str, str]

3.7 Multi-Tenancy Isolation

Isolation is enforced at three levels:

  1. Key scoping: All storage writes and reads use tenant.scoped_key(key) as the actual storage key. A caller cannot read another tenant's data by guessing keys.
  2. Vector index isolation: VectorStorage maintains a separate VectorIndex instance per tenant.storage_prefix(). Semantic searches are inherently scoped.
  3. Encryption key isolation: EncryptionEngine derives a unique AES-256 key per tenant_id using PBKDF2-SHA256. Decrypting another tenant's ciphertext with the wrong derived key fails authentication (GCM tag verification).

4. Quick Start Guide

Installation

pip install fennec-memory
pip install aiosqlite cryptography numpy
# Production extras:
pip install aioredis faiss-cpu aioboto3 asyncpg

Minimal Working Example

import asyncio
from fennec_memory.persistence import PersistenceManager, TenantContext, DataCategory

async def main():
    # 1. Create and initialize
    pm = await PersistenceManager.create()

    # 2. Define a tenant context
    tenant = TenantContext(
        tenant_id="acme",
        namespace="prod",
        user_id="alice",
        roles=["writer"],
    )

    # 3. Store a document
    result = await pm.store(
        key="doc:001",
        value=b"My RAG document content...",
        tenant=tenant,
        category=DataCategory.DOCUMENT,
        ttl=3600,
    )
    print(result.success, result.version.version_num)

    # 4. Retrieve it
    result = await pm.retrieve("doc:001", tenant, category=DataCategory.DOCUMENT)
    print(result.data[:40])

    # 5. Shut down
    await pm.shutdown()

asyncio.run(main())
pmm=await PersistenceManager.create()
async with pmm as pm:
    tenant = TenantContext(tenant_id="acme", namespace="prod", roles=["writer"])
    await pm.store("cache:response:123", b"cached answer", tenant,
                   category=DataCategory.CACHE, ttl=300)

Store → Retrieve → Delete Pattern

async with PersistenceManager.create() as pm:
    # Store
    await pm.store("key", value, tenant, category=DataCategory.METADATA)

    # Retrieve
    result = await pm.retrieve("key", tenant, category=DataCategory.METADATA)
    if result.success:
        data = result.data
    else:
        print(f"Miss: {result.error}")

    # Delete
    await pm.delete("key", tenant, category=DataCategory.METADATA)

5. Public API Reference

5.1 PersistenceManager

The only class you need to import for normal usage.

from fennec_memory.persistence import PersistenceManager, PersistenceManagerConfig
# Full import paths (for direct subsystem access):
from fennec_memory.persistence import PersistenceManager, PersistenceManagerConfig
from fennec_memory.persistence import TenantContext, DataCategory, DataTier, StorageConfig, StorageType
from fennec_memory.persistence import AccessPolicy, Permission, EncryptionEngine
from fennec_memory.persistence import RoutingRule

Factory / Lifecycle


PersistenceManager.create
@classmethod
async def create(cls, config: Optional[PersistenceManagerConfig] = None) -> PersistenceManager

Creates, configures, and fully initialises a PersistenceManager. Calls initialize() internally. Returns a ready-to-use instance.

Parameter Type Required Description
config PersistenceManagerConfig No Full configuration object. If None, all defaults are used (in-memory backends, dev encryption key).
# With defaults (dev/test)
pm = await PersistenceManager.create()

# With custom config
from fennec_memory.persistence import PersistenceManagerConfig, StorageConfig, StorageType
config = PersistenceManagerConfig(
    db_config=StorageConfig(
        storage_type=StorageType.RELATIONAL,
        extra={"db_path": "/var/data/rag.db"},
    ),
    master_key=os.environ["MASTER_KEY"].encode(),
    enable_encryption=True,
)
pm = await PersistenceManager.create(config)

initialize
async def initialize(self) -> None

Connects all four backends concurrently and starts the LifecycleManager background sweep. Idempotent — safe to call multiple times.


shutdown
async def shutdown(self) -> None

Stops the lifecycle background task and disconnects all backends gracefully. Always call this (or use the async context manager) to avoid resource leaks.


health_check
async def health_check(self) -> Dict[str, bool]

Returns per-backend reachability status.

health = await pm.health_check()
# {"key_value": True, "vector": True, "relational": True, "object": True}

Core Store / Retrieve Operations


store

Universal write operation. Automatically routes to the correct backend, encrypts, versions, and enforces access control.

async def store(
    self,
    key:       str,
    value:     Any,
    tenant:    TenantContext,
    category:  DataCategory = DataCategory.DOCUMENT,
    ttl:       Optional[int] = None,
    tags:      Optional[Dict[str, str]] = None,
    tier:      Optional[DataTier] = None,
    encrypt:   bool = True,
    size_hint: int = 0,
) -> PersistenceResult
Parameter Type Required Description
key str Yes Storage key. Max 512 chars. No null bytes, \r, \n, ../, or ..\\.
value Any Yes Data to store. Accepts bytes, str, np.ndarray, or any JSON-serialisable object.
tenant TenantContext Yes Tenant isolation context with RBAC roles.
category DataCategory No Determines which backend is used (default: DOCUMENT).
ttl int No Time-to-live in seconds. None = no expiry. Cache entries default to config.default_cache_ttl_secs.
tags Dict[str, str] No Arbitrary string key-value tags attached to the version record.
tier DataTier No Override the router's automatic tier selection.
encrypt bool No Encrypt the value before writing (default: True). np.ndarray values are never encrypted.
size_hint int No Pre-computed byte size; used for routing size decisions when value is already serialised.

Returns: PersistenceResult with success=True, the version record, and measured latency_ms.

Internal steps:

  1. Key and tenant validated by DataSanitizer.
  2. AccessControlManager.require_permission(WRITE) — raises PermissionError on denial.
  3. Value serialised to bytes (JSON for objects, tobytes() for arrays).
  4. DataSanitizer.validate_value_size() — rejects values >100 MB.
  5. StorageRouter.route(category, size_bytes) selects StorageType and DataTier.
  6. AES-256-GCM encryption applied (unless encrypt=False or value is np.ndarray).
  7. VersionManager.record() creates a SHA-256-checksummed version entry.
  8. StorageMetadata assembled with all routing, TTL, tier, and version information.
  9. Chosen backend's set() called.
  10. Metadata mirrored to DatabaseStorage (unless the primary backend already is DatabaseStorage).
# Store a document
result = await pm.store(
    key="doc:annual_report_2024",
    value=b"<report content>",
    tenant=tenant,
    category=DataCategory.DOCUMENT,
    ttl=86400,
    tags={"source": "upload", "year": "2024"},
)
print(f"Stored version {result.version.version_num}, "
      f"checksum {result.version.checksum[:16]}…")

# Store JSON metadata
await pm.store(
    key="meta:session:001",
    value={"query_count": 42, "last_query": "explain decorators"},
    tenant=tenant,
    category=DataCategory.METADATA,
)

retrieve

Universal read operation. Determines the correct backend, fetches, and decrypts the value.

async def retrieve(
    self,
    key:      str,
    tenant:   TenantContext,
    category: Optional[DataCategory] = None,
    decrypt:  bool = True,
) -> PersistenceResult
Parameter Type Required Description
key str Yes The key to retrieve.
tenant TenantContext Yes Must have READ permission.
category DataCategory No Overrides routing when specified. Defaults to DOCUMENT.
decrypt bool No Attempt AES-256-GCM decryption of the retrieved bytes (default: True).

Returns: PersistenceResult. On miss: success=False, error="Key not found". On TTL expiry: error="Key expired".

result = await pm.retrieve("doc:annual_report_2024", tenant,
                            category=DataCategory.DOCUMENT)
if result.success:
    content = result.data  # bytes
    print(f"From cache: {result.from_cache}")
else:
    print(f"Not found: {result.error}")

delete

Remove a key from its backend.

async def delete(
    self,
    key:      str,
    tenant:   TenantContext,
    category: DataCategory = DataCategory.DOCUMENT,
) -> PersistenceResult

Requires DELETE permission. Routes to the same backend as store and retrieve.


Vector Operations


store_embedding

Store a single embedding vector with optional metadata.

async def store_embedding(
    self,
    key:    str,
    vector: np.ndarray,
    tenant: TenantContext,
    meta:   Optional[Dict[str, Any]] = None,
    ttl:    Optional[int] = None,
) -> PersistenceResult
Parameter Type Required Description
key str Yes Unique identifier for this embedding (e.g., "chunk:doc001:para3").
vector np.ndarray Yes 1-D float32 array. Length must match config.vector_config.extra["embedding_dim"] (default: 1536).
tenant TenantContext Yes Requires WRITE on DataCategory.EMBEDDING.
meta Dict[str, Any] No Arbitrary metadata stored alongside the vector (e.g., source document ID, chunk index).
ttl int No TTL in seconds.

Returns: PersistenceResult with storage_type=StorageType.VECTOR.

Dimension mismatch: If vector.shape[0] does not match the configured embedding dimension, store_embedding returns PersistenceResult.fail(...) with a descriptive error. No exception is raised.

import numpy as np

embedding = np.random.rand(1536).astype(np.float32)
result = await pm.store_embedding(
    key="chunk:doc001:para3",
    vector=embedding,
    tenant=tenant,
    meta={"doc_id": "doc:001", "chunk_index": 3, "text": "first 100 chars..."},
)

Semantic similarity search over stored embeddings for a specific tenant.

async def vector_search(
    self,
    query_vector: np.ndarray,
    tenant:       TenantContext,
    k:            int = 10,
    filter_meta:  Optional[Dict[str, Any]] = None,
) -> List[Tuple[str, float, Dict[str, Any]]]
Parameter Type Required Description
query_vector np.ndarray Yes Query embedding. Must match the configured dimension.
tenant TenantContext Yes Requires READ on DataCategory.EMBEDDING.
k int No Number of results to return (default: 10).
filter_meta Dict[str, Any] No Post-filter results by exact metadata key-value matches (AND semantics — all fields must match).

Returns: List[Tuple[key, cosine_score, metadata]], sorted by descending similarity.

query_vec = embed("What are Python decorators?")  # your embed function

results = await pm.vector_search(
    query_vector=query_vec,
    tenant=tenant,
    k=5,
    filter_meta={"doc_id": "doc:001"},
)

for key, score, meta in results:
    print(f"{score:.4f}  {key}  chunk={meta['chunk_index']}")

Batch Operations


batch_store

Store multiple items, chunked to avoid backend overload.

async def batch_store(
    self,
    items:      Dict[str, Any],
    tenant:     TenantContext,
    category:   DataCategory = DataCategory.DOCUMENT,
    ttl:        Optional[int] = None,
    chunk_size: Optional[int] = None,
) -> BatchResult
Parameter Type Required Description
items Dict[str, Any] Yes Mapping of {key: value} pairs to store.
tenant TenantContext Yes Requires WRITE permission.
category DataCategory No Applied uniformly to all items.
ttl int No Applied uniformly to all items.
chunk_size int No Items per concurrent chunk. Defaults to config.batch_chunk_size (100).

Concurrent within each chunk; chunks are sequential. Individual item failures do not abort the batch.

documents = {
    "doc:001": b"content one",
    "doc:002": b"content two",
    "doc:003": b"content three",
}
batch = await pm.batch_store(documents, tenant, category=DataCategory.DOCUMENT)
print(f"{batch.succeeded}/{batch.total} stored in {batch.latency_ms:.1f}ms")

batch_retrieve

Retrieve multiple keys concurrently from a single backend.

async def batch_retrieve(
    self,
    keys:     List[str],
    tenant:   TenantContext,
    category: DataCategory = DataCategory.DOCUMENT,
) -> BatchResult
batch = await pm.batch_retrieve(["doc:001", "doc:002", "doc:003"], tenant)
for result in batch.results:
    if result.success:
        process(result.key, result.data)

Version History


get_version_history

Return the version chain for a key.

def get_version_history(
    self,
    key:    str,
    tenant: TenantContext,
    limit:  int = 20,
) -> List[VersionRecord]

Returns up to limit VersionRecord objects in reverse-chronological order (latest first). The in-memory VersionManager retains the last config.max_versions (default: 50) versions per key.

VersionRecord fields:

Field Type Description
version_id str UUID for the version
version_num int Monotonically increasing per key (1, 2, 3, …)
checksum str SHA-256 of the stored data
parent_id Optional[str] version_id of the preceding version
created_at datetime Creation timestamp
created_by Optional[str] tenant.user_id at write time
size_bytes int Serialised size of the value
tags Dict[str, str] Tags supplied at write time
history = pm.get_version_history("doc:001", tenant, limit=5)
for v in history:
    print(f"v{v.version_num}  {v.version_id[:8]}  "
          f"{v.created_at.isoformat()}  {v.size_bytes}B")

rollback_to_version

Restore a key to a specific historical version.

async def rollback_to_version(
    self,
    key:        str,
    tenant:     TenantContext,
    version_id: str,
    category:   DataCategory = DataCategory.DOCUMENT,
) -> PersistenceResult

Requires WRITE permission. Looks up the raw snapshot bytes for version_id in VersionManager, then calls store() with encrypt=False (the snapshot is already in its original form). Returns PersistenceResult.fail if version_id is not found in history (pruned beyond max_versions).

result = await pm.rollback_to_version("doc:001", tenant,
                                       version_id="3f2a1b9c-...")
print(f"Rolled back to v{result.version.version_num}")

Backup


create_backup

Create a point-in-time snapshot of all keys for a tenant on a specific backend.

async def create_backup(
    self,
    tenant:       TenantContext,
    storage_type: StorageType = StorageType.RELATIONAL,
    prefix:       str = "",
    tags:         Optional[Dict[str, str]] = None,
) -> Snapshot
Parameter Type Required Description
tenant TenantContext Yes Requires BACKUP permission (role: backup_op or admin).
storage_type StorageType No Which backend to snapshot (default: RELATIONAL).
prefix str No Only snapshot keys with this prefix. "" = all keys.
tags Dict[str, str] No Metadata tags attached to the snapshot manifest.

Returns: Snapshot with fields: snapshot_id, tenant_id, namespace, created_at, key_count, size_bytes, storage_path, storage_type, tags.

The snapshot is written to {config.backup_base_path}/{snapshot_id}/data.pkl with a manifest.json sidecar.

snap = await pm.create_backup(
    tenant=tenant,
    storage_type=StorageType.RELATIONAL,
    tags={"reason": "pre-migration", "env": "prod"},
)
print(f"Snapshot {snap.snapshot_id}: {snap.key_count} keys, {snap.size_bytes} bytes")

restore_backup

Restore a snapshot into a backend.

async def restore_backup(
    self,
    snapshot_id:  str,
    tenant:       TenantContext,
    storage_type: StorageType = StorageType.RELATIONAL,
    overwrite:    bool = True,
) -> BatchResult

Requires RESTORE permission. If overwrite=False, only keys not already present are written.

batch = await pm.restore_backup(snap.snapshot_id, tenant, overwrite=True)
print(f"Restored {batch.succeeded} keys")

list_backups
def list_backups(self, tenant_id: Optional[str] = None) -> List[Snapshot]

Returns all known snapshots, sorted newest-first. Filter by tenant_id if provided.


Transactions


transaction (context manager)

Atomic multi-operation transaction across any combination of backends.

@asynccontextmanager
async def transaction(self) -> AsyncIterator[str]

Usage:

async with pm.transaction() as txn_id:
    await pm.txn_set(txn_id, "key1", value1, tenant, "kv")
    await pm.txn_set(txn_id, "key2", value2, tenant, "db")
    await pm.txn_delete(txn_id, "old_key", tenant, "kv")
# All operations committed atomically on exit.
# On any exception, compensating deletes are executed in reverse order.

On exception, the context manager calls TransactionManager.rollback() which executes compensating operations (delete for each successful set) in reverse order.

Transaction rollback limitation: If a staged operation is a delete, its data cannot be recovered by compensating operations (best-effort only). Avoid using txn_delete inside sensitive transactions where full atomicity is required; use a txn_set with a sentinel value instead.


txn_set

Stage a set operation within an open transaction.

async def txn_set(
    self,
    txn_id:    str,
    key:       str,
    value:     Any,
    tenant:    TenantContext,
    backend_name: str,           # "kv" | "vector" | "db" | "object"
    metadata:  Optional[StorageMetadata] = None,
) -> None

txn_delete

Stage a delete operation within an open transaction.

async def txn_delete(
    self,
    txn_id:       str,
    key:          str,
    tenant:       TenantContext,
    backend_name: str,
) -> None

Backend name mapping:

Name Backend
"kv" KeyValueStorage
"vector" VectorStorage
"db" DatabaseStorage
"object" ObjectStorage

ACL & Audit


set_access_policy

Set a fine-grained access policy for a tenant namespace.

def set_access_policy(self, policy: AccessPolicy) -> None

See Section 7 for full policy configuration.


get_audit_log

Retrieve the in-process audit log.

def get_audit_log(
    self,
    tenant_id: Optional[str] = None,
    limit:     int = 1000,
) -> List[Dict[str, Any]]

Returns the last limit audit entries, optionally filtered by tenant_id. Each entry is a dict with keys: ts, tenant_id, namespace, user_id, op, category, granted, reason.


Routing Customization


add_routing_rule

Register a custom routing rule at runtime.

def add_routing_rule(self, rule: RoutingRule) -> None

See Section 5.2 for rule configuration.


Metrics


get_metrics

Return per-backend operation counters and latency statistics.

def get_metrics(self) -> Dict[str, Any]
m = pm.get_metrics()
# {
#   "key_value":       {"reads": 420, "writes": 85, "deletes": 12,
#                       "errors": 0, "latency_total_ms": 12400.0,
#                       "avg_latency_ms": 25.3},
#   "vector":          {...},
#   "relational":      {...},
#   "object":          {...},
#   "lifecycle_stats": {"sweeps": 14, "evicted": 3, "promoted": 0},
# }

estimate_storage_cost

Calculate relative storage cost for capacity planning.

def estimate_storage_cost(
    self,
    tier:    DataTier,
    size_gb: float,
    months:  float = 1.0,
) -> float

Returns a relative cost unit (HOT = 1.0 per GB/month). Use to compare tier migration savings.

hot_cost  = pm.estimate_storage_cost(DataTier.HOT,    size_gb=100, months=12)
cold_cost = pm.estimate_storage_cost(DataTier.COLD,   size_gb=100, months=12)
savings_pct = (hot_cost - cold_cost) / hot_cost * 100
print(f"Moving 100GB from HOT to COLD saves {savings_pct:.0f}%")
# Moving 100GB from HOT to COLD saves 95%

5.2 StorageRouter

Evaluates a prioritised rule list to select the backend for each operation.

from fennec_memory.persistence import StorageRouter, RoutingRule, DataCategory, StorageType, DataTier

router = StorageRouter()
storage_type, tier = router.route(DataCategory.CACHE)
# → (StorageType.KEY_VALUE, DataTier.HOT)

Default Rules (in priority order)

Priority Rule Name Condition Target
10 embedding_to_vector category == EMBEDDING VECTOR
20 document_to_object category == DOCUMENT or size > 1 MB OBJECT
30 cache_to_kv category == CACHE KEY_VALUE
40 metadata_to_db category in {METADATA, AUDIT, CHECKPOINT} RELATIONAL
999 default_kv catch-all KEY_VALUE

RoutingRule

@dataclass
class RoutingRule:
    name:        str
    priority:    int          # lower = higher priority
    condition:   Callable[[DataCategory, int, Dict[str, Any]], bool]
    target:      StorageType
    tier_hint:   DataTier = DataTier.HOT
    description: str = ""

add_rule / route

def add_rule(self, rule: RoutingRule) -> None
def route(self, category: DataCategory, size_bytes: int = 0,
          extra: Optional[Dict[str, Any]] = None) -> Tuple[StorageType, DataTier]
def route_with_cost(self, category: DataCategory, size_bytes: int,
                    access_frequency: float = 1.0) -> Tuple[StorageType, DataTier]

route_with_cost selects the tier based on access_frequency (reads/hour): >100 → HOT, >10 → WARM, >0.1 → COLD, else → FROZEN.

# Add a rule that sends checkpoint data > 10MB to object storage
pm.add_routing_rule(RoutingRule(
    name="large_checkpoint_to_object",
    priority=5,
    condition=lambda cat, size, _: cat == DataCategory.CHECKPOINT and size > 10_000_000,
    target=StorageType.OBJECT,
    tier_hint=DataTier.WARM,
    description="Large checkpoints go to object storage",
))

5.3 VersionManager

Manages in-memory version history per scoped key.

from fennec_memory.persistence import VersionManager, VersionRecord
Method Signature Description
record (scoped_key, data, created_by, tags) -> DataVersion Create and persist a new version. Prunes oldest if over max_versions.
get_history (scoped_key, limit) -> List[VersionRecord] Latest-first version list.
get_snapshot (scoped_key, version_id) -> Optional[bytes] Raw bytes for a specific version.
rollback (scoped_key, version_id) -> Optional[bytes] Alias for get_snapshot.
latest_version (scoped_key) -> Optional[VersionRecord] Most recent VersionRecord.
diff_versions (scoped_key, v1_id, v2_id) -> Dict Size delta and equality check between two versions.

VersionRecord fields: version_id, version_num, checksum (SHA-256), parent_id, created_at, created_by, size_bytes, tags.

diff = pm._versions.diff_versions(
    tenant.scoped_key("doc:001"), old_version_id, new_version_id
)
print(f"Size delta: {diff['size_delta']} bytes, identical: {diff['same']}")

5.4 BackupManager

Point-in-time snapshot management.

from fennec_memory.persistence import BackupManager, Snapshot
Method Signature Description
create_snapshot (backend, tenant, storage_type, prefix, tags) -> Snapshot Capture all matching keys into a pickle file + manifest.
restore_snapshot (snapshot_id, backend, tenant, overwrite) -> BatchResult Restore snapshot data into a backend.
list_snapshots (tenant_id=None) -> List[Snapshot] All snapshots, newest-first.
delete_snapshot (snapshot_id) -> bool Remove snapshot files from disk.

Snapshot fields: snapshot_id, tenant_id, namespace, created_at, key_count, size_bytes, storage_path, storage_type, tags.


5.5 TransactionManager

Cross-backend 2-phase-commit-lite.

from fennec_memory.persistence import TransactionManager
Method Signature Description
begin () -> str Open a new transaction; returns txn_id.
add_operation (txn_id, operation_dict) -> None Stage one operation.
commit (txn_id) -> bool Execute all staged ops. On partial failure, runs compensating deletes in reverse. Returns True on full success.
rollback (txn_id) -> bool Mark transaction as rolled back without executing.

Use PersistenceManager.transaction() (the async context manager) rather than calling these methods directly.


5.6 LifecycleManager

Background data lifecycle enforcement.

from fennec_memory.persistence import LifecycleManager
Method Signature Description
start () -> None Launch the background sweep task.
stop () -> None Signal the sweep loop to exit.
estimate_cost (tier, size_gb, months) -> float Relative storage cost for planning.
recommend_tier (access_count, age_days, size_bytes) -> DataTier Recommend a tier based on access patterns.
get_stats () -> Dict[str, Any] Returns {"sweeps": N, "evicted": N, "promoted": N}.

Sweep interval is set via config.lifecycle_sweep_secs (default: 60 seconds). Each sweep:

  1. Iterates KeyValueStorage._meta_index for expired entries and deletes them.
  2. Increments stats["evicted"] for each deleted entry.

Tier recommendation thresholds (recommend_tier):

Access Rate Recommended Tier
> 10 accesses/day HOT
> 1 access/day WARM
> 0.1 accesses/day COLD
≤ 0.1 accesses/day FROZEN
recommendation = pm._lifecycle.recommend_tier(
    access_count=30, age_days=10, size_bytes=500_000
)
# → DataTier.WARM (3 reads/day)

5.7 Storage Backends

All four backends implement BasePersistence and share the same method signatures. They can be used directly for advanced scenarios or as replaceable components.

from fennec_memory.persistence import KeyValueStorage, VectorStorage, DatabaseStorage, ObjectStorage

BasePersistence — Shared Interface

Every backend implements:

Method Description
connect() Establish connection(s).
disconnect() Gracefully close all connections.
health_check() -> bool Reachability check.
get(key, tenant) -> PersistenceResult Retrieve with TTL check and access tracking.
set(key, value, tenant, metadata) -> PersistenceResult Upsert with versioning.
delete(key, tenant) -> PersistenceResult Remove key.
exists(key, tenant) -> bool Check existence without loading data.
list_keys(prefix, tenant, limit, offset) -> List[str] Paginated key enumeration.
batch_get(keys, tenant) -> BatchResult Concurrent multi-get.
batch_set(items, tenant, metadata) -> BatchResult Concurrent multi-set.
batch_delete(keys, tenant) -> BatchResult Concurrent multi-delete.
expire(key, tenant, ttl_seconds) -> bool Set/update TTL (KV only).
get_ttl(key, tenant) -> Optional[int] Remaining TTL in seconds (KV only).
get_metrics() -> Dict[str, Any] Read/write/delete counts and avg latency.

VectorStorage-Specific

async def search(
    self,
    query_vector: np.ndarray,
    tenant:       TenantContext,
    k:            int = 10,
    filter_meta:  Optional[Dict[str, Any]] = None,
) -> List[Tuple[str, float, Dict[str, Any]]]

Each tenant namespace gets an isolated VectorIndex instance. The index uses L2-normalised cosine similarity.

DatabaseStorage-Specific

async def get_version_history(self, key, tenant, limit=20) -> List[Dict[str, Any]]
async def restore_version(self, key, tenant, version_id) -> PersistenceResult

DatabaseStorage persists every version snapshot to a version_history table alongside the main persistence_store table.


6. Configuration System

PersistenceManagerConfig

from fennec_memory.persistence import PersistenceManagerConfig, StorageConfig, StorageType

config = PersistenceManagerConfig(
    # Backend configs
    kv_config=StorageConfig(storage_type=StorageType.KEY_VALUE),
    vector_config=StorageConfig(
        storage_type=StorageType.VECTOR,
        extra={"embedding_dim": 1536},
    ),
    db_config=StorageConfig(
        storage_type=StorageType.RELATIONAL,
        extra={"db_path": "/var/data/rag_metadata.db"},
    ),
    object_config=StorageConfig(
        storage_type=StorageType.OBJECT,
        extra={"base_path": "/var/data/rag_objects"},
    ),

    # Security
    master_key=bytes.fromhex(os.environ["MASTER_KEY_HEX"]),  # 32 bytes exactly
    enable_encryption=True,

    # Lifecycle
    max_versions=50,
    lifecycle_sweep_secs=60,
    backup_base_path="/var/data/rag_backups",

    # Performance
    default_cache_ttl_secs=3600,
    batch_chunk_size=100,
)

Full field reference:

Field Type Default Description
kv_config StorageConfig in-memory defaults Key-value backend config.
vector_config StorageConfig embedding_dim=1536 Vector backend config.
db_config StorageConfig db_path=":memory:" Relational backend config.
object_config StorageConfig base_path="/tmp/rag_objects" Object storage config.
master_key bytes dev key (32B) Master encryption key. Must be exactly 32 bytes. Read from environment in production.
enable_encryption bool True Global encryption toggle.
max_versions int 50 Maximum versions retained per key in VersionManager.
lifecycle_sweep_secs int 60 Background sweep interval.
backup_base_path str "/tmp/rag_backups" Snapshot storage directory.
default_cache_ttl_secs int 3600 TTL applied to DataCategory.CACHE entries when no TTL is specified.
batch_chunk_size int 100 Items per concurrent chunk in batch_store.

StorageConfig

@dataclass
class StorageConfig:
    storage_type:    StorageType
    host:            str = "localhost"
    port:            int = 0
    database:        str = ""
    username:        str = ""
    password:        str = ""
    ssl:             bool = False
    pool_size:       int = 10
    max_overflow:    int = 20
    connect_timeout: int = 5
    command_timeout: int = 30
    retry_attempts:  int = 3
    retry_delay_ms:  int = 100
    extra:           Dict[str, Any] = field(default_factory=dict)

extra is backend-specific:

  • VectorStorage: {"embedding_dim": 1536}
  • DatabaseStorage: {"db_path": "/path/to/db.sqlite"}
  • ObjectStorage: {"base_path": "/path/to/objects"}

Environment Variable Reference

Variable Used By Description
PERSISTENCE_MASTER_KEY EncryptionEngine Base64-encoded 32-byte master key. If absent, falls back to a deterministic dev key. Never use the dev key in production.

Generating a Master Key

from fennec_memory.persistence import EncryptionEngine
key_b64 = EncryptionEngine.generate_master_key()
print(key_b64)  # store in Vault / KMS

7. Security Model

7.1 Encryption — AES-256-GCM

EncryptionEngine uses AES-256-GCM (authenticated encryption) with a per-tenant derived key.

Key derivation:

tenant_key = PBKDF2-SHA256(master_key, salt=tenant_id, iterations=100_000, length=32)

Derived keys are cached in memory to avoid re-deriving on every operation.

Blob format:

[1 byte: key_version] [12 bytes: GCM nonce] [N bytes: ciphertext + 16-byte GCM tag]

The GCM tag authenticates the ciphertext. Any bit-flip or tampered byte causes decrypt() to raise an exception before returning data. The version byte enables future key rotation without re-encrypting all blobs.

np.ndarray values (embeddings) are never encrypted — they must be stored as raw floats for the vector index to function.

from fennec_memory.persistence import EncryptionEngine
import base64

# Generate a new master key
master_key_b64 = EncryptionEngine.generate_master_key()
print(f"Save this securely: {master_key_b64}")

# Direct usage
engine = EncryptionEngine(master_key=base64.b64decode(master_key_b64))

blob = engine.encrypt(b"sensitive payload", tenant_id="acme")
data = engine.decrypt(blob, tenant_id="acme")

# With Additional Authenticated Data (AAD)
blob = engine.encrypt(b"payload", tenant_id="acme", aad=b"context")
data = engine.decrypt(blob, tenant_id="acme", aad=b"context")

# JSON convenience wrappers
blob = engine.encrypt_json({"key": "value"}, tenant_id="acme")
obj  = engine.decrypt_json(blob, tenant_id="acme")

7.2 Role-Based Access Control

Built-in roles and their permissions:

Role READ WRITE DELETE LIST BACKUP RESTORE ADMIN
reader
writer
editor
backup_op
admin

Assign roles via TenantContext.roles:

tenant = TenantContext(
    tenant_id="acme",
    namespace="prod",
    user_id="alice",
    roles=["editor"],   # can read, write, delete, list
)

Multiple roles are combined — a user with ["reader", "backup_op"] gets the union of both permission sets.

Open-by-default: If TenantContext.roles is empty and no AccessPolicy is registered for the tenant, the system permits all operations by default. Enforcement activates as soon as at least one role is assigned to any TenantContext or an AccessPolicy is set for the tenant. Explicitly assign roles or set a policy for all tenants in production.

7.3 Access Policies

Fine-grained policies override role-level permissions:

from fennec_memory.persistence import AccessPolicy, Permission, DataCategory

policy = AccessPolicy(
    tenant_id="acme",
    namespace="prod",
    allowed_ops={Permission.READ, Permission.WRITE, Permission.LIST},
    denied_ops={Permission.DELETE},          # deny delete even for editors
    data_categories={DataCategory.DOCUMENT, DataCategory.CACHE},
    ip_whitelist=["10.0.0.0/8"],            # restrict to internal network
    rate_limit=100,                          # max 100 operations per second
)

pm.set_access_policy(policy)

AccessPolicy fields:

Field Type Default Description
tenant_id str Required Target tenant.
namespace str Required Target namespace.
allowed_ops Set[Permission] Required Permitted operations.
denied_ops Set[Permission] set() Explicitly denied operations (takes precedence over role grants).
data_categories Set[DataCategory] all categories Restrict policy to specific data categories.
ip_whitelist List[str] [] If non-empty, only requests from these CIDR ranges are permitted.
rate_limit Optional[int] None Maximum operations per second via token-bucket limiter.

Authorization decision order:

  1. Resolve effective permissions from tenant.roles.
  2. Check op not in effective_perms → deny with reason "insufficient_role".
  3. Check AccessPolicy.permits(op, category) if a policy exists → deny with reason "policy_denied".
  4. Check rate limiter → deny with reason "rate_limited".
  5. Grant and record to audit log.

7.4 Rate Limiting

RateLimiter uses a token-bucket algorithm. The bucket refills at ops_per_second tokens per second and allows bursts up to capacity operations.

from fennec_memory.persistence import RateLimiter

limiter = RateLimiter(ops_per_second=100)
if limiter.allow():
    # proceed
    pass

Rate limiters are instantiated automatically when a policy with rate_limit is registered.

7.5 Input Validation (DataSanitizer)

DataSanitizer is applied before every store() and retrieve() call. It can also be used directly:

from fennec_memory.persistence import DataSanitizer

# Validate and clean a key (raises ValueError on violation)
clean_key = DataSanitizer.validate_key("my::key")

# Validate value size (raises ValueError if > 100 MB)
DataSanitizer.validate_value_size(serialised_bytes)

# Validate tenant context (raises ValueError on invalid tenant_id or namespace)
DataSanitizer.validate_tenant(tenant)
Validation Rule
Key type Must be a non-empty str
Key length ≤ 512 characters
Key characters No \x00, \r, \n
Key patterns No ../, ..\, \x00 (path traversal prevention)
Value size ≤ 100 MB
Tenant context tenant_id and namespace must be non-empty strings
Tenant characters No .., /, \, \x00 in tenant_id or namespace

Keys are stripped of leading/trailing whitespace before use.

7.6 Audit Log

Every access control decision (granted or denied) is recorded with:

{
    "ts":        "2026-04-21T09:30:00.123+00:00",
    "tenant_id": "acme",
    "namespace": "prod",
    "user_id":   "alice",
    "op":        "write",
    "category":  "document",
    "granted":   True,
    "reason":    "ok",   # or "insufficient_role" | "policy_denied" | "rate_limited"
}

Retrieve via pm.get_audit_log(tenant_id="acme", limit=500).

In production, forward the audit log to an append-only SIEM or database table by replacing or wrapping AccessControlManager._record_audit.


8. Storage Backends — Deep Dive

KeyValueStorage (in-memory / Redis shape)

Current implementation: Async in-memory dict with a background TTL reaper task (1-second sweep). Thread-safe via asyncio.Lock.

Production swap: Replace _store dict operations with aioredis calls. The get, set, delete, expire, get_ttl, and batch methods map 1:1 to Redis commands.

Key features:

  • TTL enforcement via a background coroutine that sweeps _ttl_index every second.
  • get_ttl() returns remaining seconds.
  • expire() updates an existing key's TTL without touching its value.
  • Metadata indexed in _meta_index for lifecycle management.

When to use: API response caching, session data, short-lived computation results, rate-limit counters.


VectorStorage (NumPy / FAISS shape)

Current implementation: Pure-NumPy cosine similarity index (VectorIndex). L2-normalises all vectors on insert. Searches via matrix multiplication (O(n·d)).

Production swap: Replace VectorIndex with faiss.IndexFlatIP for CPU, faiss.IndexIVFFlat for large-scale approximate search, or pgvector for PostgreSQL-backed persistence.

Key features:

  • One VectorIndex instance per tenant.storage_prefix() — strict isolation.
  • add() is idempotent: re-adding an existing vec_id updates in place.
  • filter_meta post-filters results by exact metadata match before returning k results (oversamples 3× internally).
  • Embedding dimension must match config.vector_config.extra["embedding_dim"].

When to use: Semantic search over document chunks, nearest-neighbour retrieval, duplicate detection.

Dimension mismatch: Both store_embedding() and VectorStorage.set() return PersistenceResult.fail(...) on dimension mismatch — no exception propagates.


DatabaseStorage (aiosqlite / asyncpg shape)

Current implementation: aiosqlite with an in-process SQLite database. Supports :memory: for testing.

Production swap: Replace aiosqlite.connect with asyncpg.connect to target PostgreSQL. The SQL is standard ANSI with SQLite-specific ON CONFLICT ... DO UPDATE (PostgreSQL equivalent: ON CONFLICT ... DO UPDATE SET).

Schema:

persistence_store   — primary key-value store with full metadata columns
version_history     — append-only version snapshots

Indices on (tenant_id, namespace), category, and tier for efficient filtering.

Key features:

  • get() checks TTL by comparing created_at + ttl_seconds against NOW().
  • Every set() appends a snapshot row to version_history.
  • get_version_history() and restore_version() are available directly on DatabaseStorage.
  • access_count is incremented atomically on every get().

When to use: Structured metadata, audit records, checkpoints, version history that must survive process restarts.


ObjectStorage (local filesystem / S3 shape)

Current implementation: Local filesystem with one file per object. Metadata stored as a JSON sidecar (.meta.json).

Production swap: Replace read_bytes/write_bytes with aioboto3 S3 calls. Key naming conventions are identical.

Key features:

  • Path sanitisation: /_, ..__ in key names.
  • Async I/O via loop.run_in_executor(None, ...) to avoid blocking the event loop.
  • TTL checked at read time from the metadata sidecar.
  • list_keys() lists actual files; .meta.json sidecars are excluded.

When to use: Large document blobs (>1 MB), file uploads, multi-part objects, audio/video assets.


Backend Comparison

Aspect KeyValueStorage VectorStorage DatabaseStorage ObjectStorage
Primary use Caching, sessions Semantic search Metadata, audits Large documents
Persistence None (in-memory) None (in-memory) SQLite / PostgreSQL Filesystem / S3
Query type Exact key Cosine similarity SQL (exact + range) Exact key
TTL support Native Via metadata Via SQL check Via metadata sidecar
Max value size 100 MB (validated) embedding_dim floats 100 MB (validated) Unlimited
Thread-safety asyncio.Lock asyncio.Lock aiosqlite run_in_executor
Version history DataVersion only DataVersion only Full SQL snapshots DataVersion only
Production swap aioredis FAISS / pgvector asyncpg (PostgreSQL) aioboto3 (S3)

9. Observability & Metrics

Per-Backend Metrics

pm.get_metrics() aggregates from all four backends and the lifecycle manager:

metrics = pm.get_metrics()

Each backend section contains:

Metric Description
reads Total get() calls (including misses).
writes Total set() calls.
deletes Total delete() calls.
errors Operations that returned success=False.
latency_total_ms Cumulative latency across all operations.
avg_latency_ms latency_total_ms / (reads + writes + deletes).
for name in ["key_value", "vector", "relational", "object"]:
    avg = metrics[name]["avg_latency_ms"]
    if avg > 100:
        alert(f"High latency on {name}: {avg:.1f}ms")

Lifecycle Stats

metrics["lifecycle_stats"]
# {"sweeps": 14, "evicted": 23, "promoted": 0}
Key Description
sweeps Number of completed lifecycle sweeps.
evicted Entries removed due to TTL expiry.
promoted (Reserved for future tier promotion logic.)

Audit Log Analytics

log = pm.get_audit_log(tenant_id="acme", limit=10000)

# Compute denial rate
denied = [e for e in log if not e["granted"]]
denial_rate = len(denied) / len(log) if log else 0.0

# Top denied operations
from collections import Counter
top_denied = Counter(e["op"] for e in denied).most_common(5)

# Rate limiting events
rate_limited = [e for e in log if e["reason"] == "rate_limited"]

Health Monitoring

health = await pm.health_check()
# {"key_value": True, "vector": True, "relational": True, "object": True}

if not all(health.values()):
    unhealthy = [k for k, v in health.items() if not v]
    alert(f"Unhealthy backends: {unhealthy}")

Batch Success Rate

batch = await pm.batch_store(documents, tenant)
print(f"Success rate: {batch.success_rate:.1%}")
print(f"Failed keys: {list(batch.errors.keys())}")

Storage Cost Estimation

for tier in [DataTier.HOT, DataTier.WARM, DataTier.COLD, DataTier.FROZEN]:
    cost = pm.estimate_storage_cost(tier, size_gb=1000, months=12)
    print(f"{tier.value:10s}: {cost:8.2f} units/year")
# hot       : 12000.00 units/year
# warm      :  3600.00 units/year
# cold      :   600.00 units/year
# frozen    :    60.00 units/year

10. Edge Cases & Failure Handling

Backend Connection Failure

PersistenceManager.initialize() calls all four backend.connect() methods concurrently via asyncio.gather. If any single backend fails to connect, the exception propagates to the caller — the PersistenceManager is not left in a partially initialised state.

Mitigation: Wrap PersistenceManager.create() in a retry loop with exponential backoff for transient connection errors (e.g., Redis not yet ready during container startup).

SQLite / Database Failure

DatabaseStorage wraps all reads and writes in aiosqlite coroutines. If the database file is corrupt, inaccessible, or locked, the underlying aiosqlite exception propagates through get() / set() and surfaces as an unhandled exception in the caller. health_check() will return False.

Mitigation: Use a non-:memory: db_path for any production data. Back up the SQLite file regularly. For production, migrate to PostgreSQL via asyncpg.

Vector Dimension Mismatch

store_embedding() checks vector.shape[0] against config.vector_config.extra["embedding_dim"]. On mismatch, it returns PersistenceResult.fail(key, WRITE, "Vector dim mismatch: ...") — no exception. The VectorStorage.set() method performs the same check and returns fail(...) inline.

Mitigation: Set embedding_dim in PersistenceManagerConfig.vector_config.extra to match your embedding model's output dimension before the first write.

Quota / Value Size Exceeded

DataSanitizer.validate_value_size() raises ValueError for values larger than 100 MB. This exception propagates directly from store().

Mitigation: Pre-chunk large documents at the application layer before calling store(). Use batch_store() with category=DataCategory.DOCUMENT to store chunks in parallel.

Key Expiry Race Condition

A key may expire between an exists() check and a subsequent get(). Both KeyValueStorage and DatabaseStorage perform TTL checks inside get() itself and return PersistenceResult.fail(key, READ, "Key expired") when the key has elapsed. Always check result.success rather than assuming a key is live after exists().

Corrupted or Missing Snapshot

BackupManager.restore_snapshot() loads snapshot data with pickle.loads. A corrupt data.pkl file raises pickle.UnpicklingError. If snapshot_id is not in the in-memory _snapshots dict and data.pkl does not exist on disk, restore_snapshot() returns BatchResult(total=0, succeeded=0, failed=1, errors={"restore": "Snapshot not found"}).

Mitigation: Verify snapshots after creation by calling list_backups() and checking key_count > 0. Store snapshots on durable, replicated storage in production.

Missing Tenant / Permission Denied

If TenantContext.roles does not include a role with the required permission, AccessControlManager.require_permission() raises PermissionError before any backend is contacted. The exception message includes tenant_id, the operation, and the namespace.

If tenant_id or namespace is empty or contains forbidden characters (.., /, \, \x00), DataSanitizer.validate_tenant() raises ValueError immediately.

Transaction Partial Failure

TransactionManager.commit() executes staged operations sequentially. If any operation fails, it runs compensating deletes in reverse order and marks the transaction "rolled_back". Compensating deletes that themselves fail are logged as errors but do not raise — the system records the failure and continues.

Mitigation: Always use async with pm.transaction() as txn_id: (the context manager) rather than calling begin() / commit() manually. The context manager guarantees rollback() is called on any exception. Avoid staging txn_delete operations for data that must be recoverable — compensating ops cannot restore deleted data.

Encryption Key Missing or Invalid

If PERSISTENCE_MASTER_KEY is absent from the environment and no master_key is supplied in the config, EncryptionEngine uses a deterministic development key derived from the string "dev-only-key". This key is not secret. Never store sensitive data with the dev key in any environment beyond local development.

If a retrieve() call attempts to decrypt a blob that was not encrypted (e.g., the key was stored with encrypt=False), the decrypt() call raises a cryptography exception. PersistenceManager.retrieve() silently catches this and returns the raw bytes as-is.

Async Context Without Running Event Loop

All PersistenceManager methods are async. Calling them from synchronous code requires asyncio.run():

# Correct in sync context
result = asyncio.run(pm.retrieve("key", tenant))

# Wrong — returns a coroutine, not the result
result = pm.retrieve("key", tenant)  # ← missing await / asyncio.run

KeyValueStorage spawns a background TTL reaper coroutine via asyncio.create_task() inside connect(). This requires an active event loop when connect() is called. Always call PersistenceManager.create() (or initialize()) from within an async function or asyncio.run().


11. Advanced Usage

Multi-Tenant Setup

import asyncio, os
from fennec_memory.persistence import (
    PersistenceManager, PersistenceManagerConfig,
    TenantContext, DataCategory, DataTier,
    AccessPolicy, Permission, StorageConfig, StorageType,
)

async def main():
    config = PersistenceManagerConfig(
        db_config=StorageConfig(
            storage_type=StorageType.RELATIONAL,
            extra={"db_path": "/var/data/rag.db"},
        ),
        master_key=bytes.fromhex(os.environ["MASTER_KEY_HEX"]),
        enable_encryption=True,
    )
    ppm=await  PersistenceManager.create(config)


    async with ppm as pm:
        # Tenant A — full access
        tenant_a = TenantContext("acme", "prod", roles=["admin"])

        # Tenant B — read-only, rate-limited to 50 ops/sec
        tenant_b = TenantContext("globalcorp", "prod", roles=["reader"])
        pm.set_access_policy(AccessPolicy(
            tenant_id="globalcorp",
            namespace="prod",
            allowed_ops={Permission.READ, Permission.LIST},
            denied_ops={Permission.WRITE, Permission.DELETE},
            rate_limit=50,
        ))

        # Tenant A stores a document — Tenant B cannot see it
        await pm.store("confidential:q4_plan", b"...", tenant_a,
                       category=DataCategory.DOCUMENT)

        # Tenant B trying to write raises PermissionError
        try:
            await pm.store("any_key", b"data", tenant_b,
                           category=DataCategory.DOCUMENT)
        except PermissionError as e:
            print(f"Correctly denied: {e}")

asyncio.run(main())

Embedding Storage and Retrieval Pipeline

async def rag_ingest(pm, tenant, docs, embed_fn):
    """Ingest documents with embeddings."""
    for doc_id, text in docs.items():
        # 1. Store raw document
        await pm.store(f"doc:{doc_id}", text.encode(), tenant,
                       category=DataCategory.DOCUMENT)

        # 2. Chunk and embed
        chunks = chunk_text(text, size=512)
        for i, chunk in enumerate(chunks):
            vec = embed_fn(chunk)
            await pm.store_embedding(
                key=f"chunk:{doc_id}:{i}",
                vector=vec,
                tenant=tenant,
                meta={"doc_id": doc_id, "chunk_idx": i, "text": chunk[:100]},
            )

async def rag_retrieve(pm, tenant, query, embed_fn, k=5):
    """Retrieve relevant chunks for a query."""
    query_vec = embed_fn(query)
    results = await pm.vector_search(query_vec, tenant, k=k)

    context_parts = []
    for key, score, meta in results:
        context_parts.append({
            "doc_id": meta["doc_id"],
            "chunk": meta["text"],
            "score": score,
        })
    return context_parts

Atomic Cross-Backend Transactions

async def update_doc_and_meta(pm, tenant, doc_id, new_content, new_meta):
    """Update document bytes and metadata atomically."""
    async with pm.transaction() as txn_id:
        await pm.txn_set(
            txn_id,
            key=f"doc:{doc_id}",
            value=new_content,
            tenant=tenant,
            backend_name="object",
        )
        await pm.txn_set(
            txn_id,
            key=f"meta:{doc_id}",
            value=new_meta,
            tenant=tenant,
            backend_name="db",
        )
    # Committed — or both rolled back on exception

Version Rollback Workflow

async def safe_update(pm, tenant, key, new_value, category):
    """Update a key, preserving the ability to roll back."""
    # Check current version before modifying
    history = pm.get_version_history(key, tenant, limit=1)
    old_version_id = history[0].version_id if history else None

    # Apply new value
    result = await pm.store(key, new_value, tenant, category=category)

    # On downstream error, roll back
    if validation_fails(result):
        if old_version_id:
            await pm.rollback_to_version(key, tenant, old_version_id, category)
        else:
            await pm.delete(key, tenant, category)

Custom Routing for Compliance

from fennec_memory.persistence import persistenceManager,RoutingRule, DataCategory, StorageType, DataTier

pm=await PersistenceManager.create(config)
# Force all audit data to a dedicated relational database, never KV
pm.add_routing_rule(RoutingRule(
    name="audit_always_to_db",
    priority=1,   # highest priority — overrides all defaults
    condition=lambda cat, _size, _ext: cat == DataCategory.AUDIT,
    target=StorageType.RELATIONAL,
    tier_hint=DataTier.COLD,
    description="Compliance: audit records must persist in the relational DB",
))

End-to-End RAG Pipeline Example

import asyncio
import numpy as np
from fennec_memory.persistence import PersistenceManager, PersistenceManagerConfig
from fennec_memory.persistence import (
    TenantContext, DataCategory, DataTier,
    StorageConfig, StorageType,
)
from fennec_memory.persistence import AccessPolicy, Permission
from fenenc_memory.persistence import RoutingRule

async def main():
    config = PersistenceManagerConfig(
        vector_config=StorageConfig(
            storage_type=StorageType.VECTOR,
            extra={"embedding_dim": 1536},
        ),
        db_config=StorageConfig(
            storage_type=StorageType.RELATIONAL,
            extra={"db_path": "./rag_meta.db"},
        ),
        object_config=StorageConfig(
            storage_type=StorageType.OBJECT,
            extra={"base_path": "./rag_docs"},
        ),
        enable_encryption=True,
        master_key=b"my-32-byte-production-key-here!",
        backup_base_path="./backups",
    )
    persistence = await PersistenceManager.create(config)

    async with persistence as pm:

        # Access policy
        pm.set_access_policy(AccessPolicy(
            tenant_id="acme",
            namespace="prod",
            allowed_ops={Permission.READ, Permission.WRITE,
                         Permission.LIST, Permission.BACKUP},
            rate_limit=500,
        ))

        tenant = TenantContext(
            tenant_id="acme",
            namespace="prod",
            user_id="pipeline@acme.com",
            roles=["writer"],
        )

        # Store document
        doc_result = await pm.store(
            key="doc:product-manual-v2",
            value=b"<product manual content...>",
            tenant=tenant,
            category=DataCategory.DOCUMENT,
            tags={"type": "manual", "product": "widget-x"},
        )
        print(f"Document stored: v{doc_result.version.version_num}")

        # Store embedding
        embedding = np.random.randn(1536).astype(np.float32)
        await pm.store_embedding(
            key="emb:product-manual-v2",
            vector=embedding,
            tenant=tenant,
            meta={"doc_key": "doc:product-manual-v2", "chunk": 0},
        )

        # Semantic search
        query_vec = np.random.randn(1536).astype(np.float32)
        search_results = await pm.vector_search(
            query_vector=query_vec, tenant=tenant, k=3,
        )
        for key, score, meta in search_results:
            print(f"  [{score:.3f}] {key}")

        # Version history
        history = pm.get_version_history("doc:product-manual-v2", tenant)
        print(f"Versions: {len(history)}")

        # Atomic transaction
        async with pm.transaction() as txn_id:
            await pm.txn_set(
                txn_id, "meta:ingestion:run-42",
                {"status": "complete", "docs": 150}, tenant, "db",
            )
            await pm.txn_delete(txn_id, "meta:ingestion:run-41", tenant, "db")

        # Backup
        backup_tenant = TenantContext(
            tenant_id="acme", namespace="prod", roles=["backup_op"],
        )
        snap = await pm.create_backup(
            tenant=backup_tenant,
            storage_type=StorageType.RELATIONAL,
            tags={"type": "scheduled"},
        )
        print(f"Backup: {snap.key_count} keys, {snap.size_bytes} bytes")

        # Health and metrics
        health = await pm.health_check()
        metrics = pm.get_metrics()
        print(f"Health: {health}")
        print(f"KV writes: {metrics['key_value']['writes']}")

        # Cost estimation
        hot_cost  = pm.estimate_storage_cost(DataTier.HOT,  size_gb=1.0)
        cold_cost = pm.estimate_storage_cost(DataTier.COLD, size_gb=1.0)
        print(f"Saving by moving to COLD: {(1 - cold_cost / hot_cost):.0%}")

asyncio.run(main())

Production Deployment Notes

Encryption key management: Never hardcode master_key. Store it in HashiCorp Vault, AWS Secrets Manager, or GCP Secret Manager. Retrieve at startup and inject as bytes:

import base64
master_key = base64.b64decode(vault_client.read_secret("persistence_master_key"))
config = PersistenceManagerConfig(master_key=master_key, enable_encryption=True)

Backend swap procedure: To switch from SQLite to PostgreSQL, update db_config to point at the PostgreSQL host and port, set username and password, and replace aiosqlite.connect in DatabaseStorage with an asyncpg connection pool. No changes to PersistenceManager or callers are needed.

Concurrency: PersistenceManager is designed for a single-process async application. For multi-process deployments, replace KeyValueStorage with a Redis-backed implementation so all processes share the same cache state. DatabaseStorage and ObjectStorage can share a single PostgreSQL/S3 endpoint across processes natively.

Lifecycle sweep interval: For production deployments with high cache churn, lower lifecycle_sweep_secs to 10–30 seconds. For deployments with predominantly long-TTL data, 300 seconds is sufficient and reduces CPU overhead.

Backup strategy: Schedule create_backup() at least daily for each active tenant on DatabaseStorage. Offload snapshot files to object storage (S3, GCS) immediately after creation. Retain 30 days of snapshots minimum for compliance. Automate delete_snapshot() for snapshots older than your retention policy.

Audit log overflow: The in-process _audit_log list grows indefinitely. In production, override AccessControlManager._record_audit() to write directly to an append-only database table or ship to a SIEM (Splunk, Datadog, OpenSearch) using an async log handler. The default implementation is suitable only for development and short-lived services.


Source: memory/persistence_module_docs.md