P3.3: Implement Redis-backed TaskStore with plan §4 keyspace layout

Implements the complete Redis backend for the TaskStore trait, mirroring
all 14 SQLite tables to Redis keyspace as specified in plan §4.

Key features:
- Tables 1-14: Full CRUD operations with Redis data structures
  - tasks → miroir:tasks:<id> hash + miroir:tasks:_index set
  - node_settings_version → miroir:node_settings_version:<index>:<node> hash
  - aliases → miroir:aliases:<name> hash + index
  - sessions → miroir:session:<id> hash with EXPIRE
  - idempotency_cache → miroir:idemp:<key> hash with EXPIRE
  - jobs → miroir:jobs:<id> hash + miroir:jobs:_queued set
  - leader_lease → miroir:lease:<scope> string via SET NX EX
  - canaries → miroir:canary:<id> hash + index
  - canary_runs → miroir:canary_runs:<canary_id> sorted set
  - cdc_cursors → miroir:cdc_cursor:<sink>:<index> string
  - tenant_map → miroir:tenant_map:<sha256> hash
  - rollover_policies → miroir:rollover:<name> hash + index
  - search_ui_config → miroir:search_ui_config:<index> hash
  - admin_sessions → miroir:admin_session:<id> hash with EXPIRE

- Extras from plan §4 footnotes:
  - search_ui_scoped_key with observation tracking
  - Rate limiting for search_ui and admin_login
  - CDC overflow buffer with LPUSH/LTRIM
  - Pub/Sub for admin_session revocation

- Integration tests (testcontainers):
  - test_redis_tasks_crud: Full task CRUD operations
  - test_redis_leader_lease: Lease acquisition and renewal
  - test_redis_lease_race: Concurrent lease acquisition (exactly one wins)
  - test_redis_memory_budget: 10k tasks + 1k sessions + 1k idempotency
  - test_redis_pubsub_session_invalidation: Pub/Sub revocation
  - Tests for all 14 tables covering CRUD operations

- Secondary _index sets for efficient list-wide queries
- MULTI/EXEC pipelines for atomic multi-key operations
- TTL-based garbage collection for sessions/idempotency
- Sync-to-async bridge using dedicated runtime (avoids nesting)

Acceptance criteria met:
✓ testcontainers-based integration tests for trait-level behavior
✓ Lease race test: two pods SET NX EX → exactly one wins
✓ Memory budget test: verifies workload creation
✓ Pub/Sub test: subscribe to miroir:admin_session:revoked

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-19 02:02:45 -04:00
parent baf124b7cf
commit 1124d97c14
4 changed files with 3343 additions and 4 deletions

View file

@ -17,6 +17,10 @@ uuid = { version = "1", features = ["v4", "serde"] }
config = "0.14"
rusqlite = { workspace = true }
futures-util = "0.3"
# Redis support (optional — enable via `redis-store` feature)
redis = { version = "0.27", features = ["aio", "tokio-comp", "connection-manager"], optional = true }
hex = "0.4"
tokio = { version = "1", features = ["rt", "time"] }
# Raft prototype (P12.OP2 research) — not for production use
# openraft 0.9.22 fails on stable Rust 1.87 (validit uses let_chains).
@ -24,7 +28,9 @@ futures-util = "0.3"
bincode = { version = "2", features = ["serde"], optional = true }
[features]
default = []
raft-proto = ["bincode"]
redis-store = ["redis"]
# Enable when openraft compiles on stable Rust:
# raft-full = ["openraft", "bincode"]
# (openraft dep removed from manifest — restore when upstream fixes let_chains on stable)
@ -46,3 +52,5 @@ tempfile = "3"
proptest = "1"
criterion = "0.5"
tokio = { version = "1", features = ["rt", "macros", "time"] }
testcontainers = "0.23"
testcontainers-modules = { version = "0.11", features = ["redis"] }

View file

@ -44,6 +44,10 @@ pub enum MiroirError {
#[error("SQLite error: {0}")]
Sqlite(#[from] rusqlite::Error),
/// Redis error.
#[error("Redis error: {0}")]
Redis(String),
/// Schema version mismatch.
#[error("schema version {store_version} is ahead of binary version {binary_version}; cannot safely start. Restore from backup or upgrade binary.")]
SchemaVersionAhead {

View file

@ -1,11 +1,15 @@
#[cfg(feature = "redis-store")]
mod redis;
mod sqlite;
#[cfg(feature = "redis-store")]
pub use redis::{RedisTaskStore, SearchUiScopedKey};
pub use sqlite::SqliteTaskStore;
use crate::Result;
use std::collections::HashMap;
/// Per-table store operations covering tables 17 from plan §4.
/// Per-table store operations covering tables 114 from plan §4.
pub trait TaskStore: Send + Sync {
// --- Lifecycle ---
@ -352,7 +356,7 @@ pub struct CanaryRow {
}
/// New or updated canary (table 8).
#[derive(Debug, Clone)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct NewCanary {
pub id: String,
pub name: String,
@ -365,7 +369,7 @@ pub struct NewCanary {
}
/// Canary run row (table 9).
#[derive(Debug, Clone)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CanaryRunRow {
pub canary_id: String,
pub ran_at: i64,
@ -375,7 +379,7 @@ pub struct CanaryRunRow {
}
/// New canary run to insert (table 9).
#[derive(Debug, Clone)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct NewCanaryRun {
pub canary_id: String,
pub ran_at: i64,

File diff suppressed because it is too large Load diff