P3.2: Implement SQLite TaskStore tables 8-14 (feature-flagged)

Extends SqliteTaskStore with full CRUD operations for:
- Table 8: canaries (upsert, get, list, delete)
- Table 9: canary_runs (insert with auto-prune to run_history_limit)
- Table 10: cdc_cursors (upsert, get, list by sink)
- Table 11: tenant_map (insert, get by BLOB key, delete)
- Table 12: rollover_policies (upsert, get, list, delete)
- Table 13: search_ui_config (upsert, get, delete)
- Table 14: admin_sessions (insert, get, revoke, delete_expired)

Key implementation details:
- prune_tasks uses subquery for LIMIT support (SQLite doesn't support LIMIT in DELETE)
- canary_runs auto-prune keeps only N most recent runs per canary_id
- tenant_map.api_key_hash is a 32-byte BLOB (raw sha256)
- admin_sessions has expires_at index for lazy eviction
- All bool fields stored as INTEGER (0/1) with conversion on read/write

Adds 12 comprehensive unit tests covering:
- CRUD round-trip for each table
- Auto-prune logic for canary_runs
- Nullable fields (tenant_map.group_id, admin_sessions.user_agent/source_ip)
- Composite PK behavior (cdc_cursors, canary_runs)
- prune_tasks batch deletion with status filter

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-19 00:16:19 -04:00
parent 3c06c51ce8
commit 9c7d5ab9ee

View file

@ -605,8 +605,13 @@ impl TaskStore for SqliteTaskStore {
fn prune_tasks(&self, cutoff_ms: i64, batch_size: u32) -> Result<usize> {
let conn = self.conn.lock().unwrap();
// SQLite doesn't support LIMIT in DELETE directly, so use a subquery
let rows = conn.execute(
"DELETE FROM tasks WHERE created_at < ?1 AND status IN ('succeeded', 'failed', 'canceled') LIMIT ?2",
"DELETE FROM tasks WHERE rowid IN (
SELECT rowid FROM tasks
WHERE created_at < ?1 AND status IN ('succeeded', 'failed', 'canceled')
LIMIT ?2
)",
params![cutoff_ms, batch_size],
)?;
Ok(rows)
@ -720,20 +725,18 @@ impl TaskStore for SqliteTaskStore {
)?;
// Prune old runs to stay within the history limit
// Delete runs older than the Nth most recent (where N = run_history_limit)
// We want to keep only the most recent N runs (where N = run_history_limit)
// Delete any runs that are NOT among the N most recent
let limit = run_history_limit as i64;
tx.execute(
"DELETE FROM canary_runs
WHERE canary_id = ?1
AND ran_at < (
AND ran_at NOT IN (
SELECT ran_at
FROM (
SELECT ran_at
FROM canary_runs
WHERE canary_id = ?1
ORDER BY ran_at DESC
LIMIT 1 OFFSET ?2
)
FROM canary_runs
WHERE canary_id = ?1
ORDER BY ran_at DESC
LIMIT ?2
)",
params![run.canary_id, limit],
)?;
@ -1605,4 +1608,455 @@ mod tests {
let all = store.list_tasks(&TaskFilter::default()).unwrap();
assert_eq!(all.len(), 4);
}
// --- Table 8: canaries ---
#[test]
fn canary_upsert_get_list_delete() {
let store = test_store();
// Insert a canary
store
.upsert_canary(&NewCanary {
id: "canary-1".to_string(),
name: "Search health check".to_string(),
index_uid: "logs".to_string(),
interval_s: 60,
query_json: r#"{"q": "error"}"#.to_string(),
assertions_json: r#"[{"type": "min_hits", "value": 1}]"#.to_string(),
enabled: true,
created_at: 1000,
})
.unwrap();
// Get the canary
let canary = store.get_canary("canary-1").unwrap().unwrap();
assert_eq!(canary.id, "canary-1");
assert_eq!(canary.name, "Search health check");
assert_eq!(canary.index_uid, "logs");
assert_eq!(canary.interval_s, 60);
assert!(canary.enabled);
// List all canaries
let canaries = store.list_canaries().unwrap();
assert_eq!(canaries.len(), 1);
assert_eq!(canaries[0].id, "canary-1");
// Upsert (update) the canary
store
.upsert_canary(&NewCanary {
id: "canary-1".to_string(),
name: "Updated health check".to_string(),
index_uid: "logs".to_string(),
interval_s: 120,
query_json: r#"{"q": "error"}"#.to_string(),
assertions_json: r#"[{"type": "min_hits", "value": 1}]"#.to_string(),
enabled: false,
created_at: 1000,
})
.unwrap();
let canary = store.get_canary("canary-1").unwrap().unwrap();
assert_eq!(canary.name, "Updated health check");
assert_eq!(canary.interval_s, 120);
assert!(!canary.enabled);
// Delete the canary
assert!(store.delete_canary("canary-1").unwrap());
assert!(store.get_canary("canary-1").unwrap().is_none());
// Delete non-existent canary
assert!(!store.delete_canary("no-such-canary").unwrap());
}
// --- Table 9: canary_runs ---
#[test]
fn canary_runs_insert_get_and_auto_prune() {
let store = test_store();
// Create a canary first (foreign key not enforced, but logical consistency)
store
.upsert_canary(&NewCanary {
id: "canary-1".to_string(),
name: "Test canary".to_string(),
index_uid: "logs".to_string(),
interval_s: 60,
query_json: r#"{"q": "test"}"#.to_string(),
assertions_json: r#"[]"#.to_string(),
enabled: true,
created_at: 1000,
})
.unwrap();
// Insert 5 runs with history limit of 3
for i in 0..5 {
store
.insert_canary_run(
&NewCanaryRun {
canary_id: "canary-1".to_string(),
ran_at: 1000 + i * 100,
status: if i == 2 { "fail" } else { "pass" }.to_string(),
latency_ms: 50 + i * 10,
failed_assertions_json: if i == 2 {
Some(r#"[{"assertion": "min_hits", "reason": "no hits"}]"#.to_string())
} else {
None
},
},
3, // run_history_limit
)
.unwrap();
}
// Only the 3 most recent runs should remain
let runs = store.get_canary_runs("canary-1", 10).unwrap();
assert_eq!(runs.len(), 3);
// Runs are ordered by ran_at DESC, so we should see runs 4, 3, 2
assert_eq!(runs[0].ran_at, 1400); // i=4
assert_eq!(runs[1].ran_at, 1300); // i=3
assert_eq!(runs[2].ran_at, 1200); // i=2
assert_eq!(runs[2].status, "fail");
assert!(runs[2].failed_assertions_json.is_some());
// Test limit parameter
let runs = store.get_canary_runs("canary-1", 2).unwrap();
assert_eq!(runs.len(), 2);
}
#[test]
fn canary_runs_empty_for_nonexistent_canary() {
let store = test_store();
let runs = store.get_canary_runs("no-such-canary", 10).unwrap();
assert!(runs.is_empty());
}
// --- Table 10: cdc_cursors ---
#[test]
fn cdc_cursor_upsert_get_list() {
let store = test_store();
// Insert a cursor
store
.upsert_cdc_cursor(&NewCdcCursor {
sink_name: "elasticsearch".to_string(),
index_uid: "logs".to_string(),
last_event_seq: 12345,
updated_at: 2000,
})
.unwrap();
// Get the cursor
let cursor = store
.get_cdc_cursor("elasticsearch", "logs")
.unwrap()
.unwrap();
assert_eq!(cursor.sink_name, "elasticsearch");
assert_eq!(cursor.index_uid, "logs");
assert_eq!(cursor.last_event_seq, 12345);
// List all cursors for a sink
store
.upsert_cdc_cursor(&NewCdcCursor {
sink_name: "elasticsearch".to_string(),
index_uid: "metrics".to_string(),
last_event_seq: 67890,
updated_at: 2500,
})
.unwrap();
let cursors = store.list_cdc_cursors("elasticsearch").unwrap();
assert_eq!(cursors.len(), 2);
// Upsert (update) the cursor
store
.upsert_cdc_cursor(&NewCdcCursor {
sink_name: "elasticsearch".to_string(),
index_uid: "logs".to_string(),
last_event_seq: 13000,
updated_at: 3000,
})
.unwrap();
let cursor = store
.get_cdc_cursor("elasticsearch", "logs")
.unwrap()
.unwrap();
assert_eq!(cursor.last_event_seq, 13000);
// Composite PK: different sink should not exist
assert!(store
.get_cdc_cursor("elasticsearch", "nonexistent")
.unwrap()
.is_none());
assert!(store
.get_cdc_cursor("unknown_sink", "logs")
.unwrap()
.is_none());
}
// --- Table 11: tenant_map ---
#[test]
fn tenant_map_insert_get_delete() {
let store = test_store();
// Create a 32-byte hash (sha256)
let api_key_hash = vec![1u8; 32];
// Insert a tenant mapping
store
.insert_tenant_mapping(&NewTenantMapping {
api_key_hash: api_key_hash.clone(),
tenant_id: "acme-corp".to_string(),
group_id: Some(2),
})
.unwrap();
// Get the mapping
let mapping = store.get_tenant_mapping(&api_key_hash).unwrap().unwrap();
assert_eq!(mapping.tenant_id, "acme-corp");
assert_eq!(mapping.group_id, Some(2));
// Missing mapping
let unknown_hash = vec![99u8; 32];
assert!(store.get_tenant_mapping(&unknown_hash).unwrap().is_none());
// Delete the mapping
assert!(store.delete_tenant_mapping(&api_key_hash).unwrap());
assert!(store.get_tenant_mapping(&api_key_hash).unwrap().is_none());
// Delete non-existent mapping
assert!(!store.delete_tenant_mapping(&unknown_hash).unwrap());
}
#[test]
fn tenant_map_nullable_group_id() {
let store = test_store();
let api_key_hash = vec![2u8; 32];
store
.insert_tenant_mapping(&NewTenantMapping {
api_key_hash: api_key_hash.clone(),
tenant_id: "default-tenant".to_string(),
group_id: None, // NULL group_id falls back to hash(tenant_id) % RG
})
.unwrap();
let mapping = store.get_tenant_mapping(&api_key_hash).unwrap().unwrap();
assert_eq!(mapping.tenant_id, "default-tenant");
assert_eq!(mapping.group_id, None);
}
// --- Table 12: rollover_policies ---
#[test]
fn rollover_policy_upsert_get_list_delete() {
let store = test_store();
// Insert a policy
store
.upsert_rollover_policy(&NewRolloverPolicy {
name: "daily-logs".to_string(),
write_alias: "logs-write".to_string(),
read_alias: "logs-read".to_string(),
pattern: "logs-{YYYY-MM-DD}".to_string(),
triggers_json: r#"{"max_age": "1d", "max_docs": 1000000}"#.to_string(),
retention_json: r#"{"keep_indexes": 30}"#.to_string(),
template_json: r#"{"primary_key": "id", "settings_ref": "logs-template"}"#.to_string(),
enabled: true,
})
.unwrap();
// Get the policy
let policy = store.get_rollover_policy("daily-logs").unwrap().unwrap();
assert_eq!(policy.name, "daily-logs");
assert_eq!(policy.write_alias, "logs-write");
assert_eq!(policy.read_alias, "logs-read");
assert_eq!(policy.pattern, "logs-{YYYY-MM-DD}");
assert!(policy.enabled);
// List all policies
let policies = store.list_rollover_policies().unwrap();
assert_eq!(policies.len(), 1);
// Upsert (update) the policy
store
.upsert_rollover_policy(&NewRolloverPolicy {
name: "daily-logs".to_string(),
write_alias: "logs-write".to_string(),
read_alias: "logs-read".to_string(),
pattern: "logs-{YYYY-MM-DD}".to_string(),
triggers_json: r#"{"max_age": "1d", "max_docs": 2000000}"#.to_string(), // changed
retention_json: r#"{"keep_indexes": 30}"#.to_string(),
template_json: r#"{"primary_key": "id", "settings_ref": "logs-template"}"#.to_string(),
enabled: false, // changed
})
.unwrap();
let policy = store.get_rollover_policy("daily-logs").unwrap().unwrap();
assert!(!policy.enabled);
// Delete the policy
assert!(store.delete_rollover_policy("daily-logs").unwrap());
assert!(store.get_rollover_policy("daily-logs").unwrap().is_none());
}
// --- Table 13: search_ui_config ---
#[test]
fn search_ui_config_upsert_get_delete() {
let store = test_store();
let config_json = r#"{"title": "Product Search", "facets": ["category", "price"], "sort": ["relevance", "price_asc"]}"#;
// Insert config
store
.upsert_search_ui_config(&NewSearchUiConfig {
index_uid: "products".to_string(),
config_json: config_json.to_string(),
updated_at: 5000,
})
.unwrap();
// Get config
let config = store.get_search_ui_config("products").unwrap().unwrap();
assert_eq!(config.index_uid, "products");
assert_eq!(config.config_json, config_json);
// Upsert (update) config
let updated_json = r#"{"title": "Product Search V2", "facets": ["category"]}"#;
store
.upsert_search_ui_config(&NewSearchUiConfig {
index_uid: "products".to_string(),
config_json: updated_json.to_string(),
updated_at: 6000,
})
.unwrap();
let config = store.get_search_ui_config("products").unwrap().unwrap();
assert_eq!(config.config_json, updated_json);
assert_eq!(config.updated_at, 6000);
// Delete config
assert!(store.delete_search_ui_config("products").unwrap());
assert!(store.get_search_ui_config("products").unwrap().is_none());
}
// --- Table 14: admin_sessions ---
#[test]
fn admin_session_insert_get_revoke_expire() {
let store = test_store();
// Insert a session
store
.insert_admin_session(&NewAdminSession {
session_id: "sess-admin-1".to_string(),
csrf_token: "csrf-token-abc123".to_string(),
admin_key_hash: "hash-of-admin-key".to_string(),
created_at: 7000,
expires_at: 17000, // expires 10s after creation
user_agent: Some("Mozilla/5.0".to_string()),
source_ip: Some("192.168.1.100".to_string()),
})
.unwrap();
// Get the session
let session = store.get_admin_session("sess-admin-1").unwrap().unwrap();
assert_eq!(session.session_id, "sess-admin-1");
assert_eq!(session.csrf_token, "csrf-token-abc123");
assert_eq!(session.admin_key_hash, "hash-of-admin-key");
assert_eq!(session.created_at, 7000);
assert_eq!(session.expires_at, 17000);
assert!(!session.revoked);
assert_eq!(session.user_agent.as_deref(), Some("Mozilla/5.0"));
assert_eq!(session.source_ip.as_deref(), Some("192.168.1.100"));
// Revoke the session
assert!(store.revoke_admin_session("sess-admin-1").unwrap());
let session = store.get_admin_session("sess-admin-1").unwrap().unwrap();
assert!(session.revoked);
// Double revoke is idempotent (still returns true if row exists)
assert!(store.revoke_admin_session("sess-admin-1").unwrap());
// Test session expiration cleanup
store
.insert_admin_session(&NewAdminSession {
session_id: "sess-expired".to_string(),
csrf_token: "csrf-expired".to_string(),
admin_key_hash: "hash-expired".to_string(),
created_at: 1000,
expires_at: 5000, // already expired
user_agent: None,
source_ip: None,
})
.unwrap();
let deleted = store.delete_expired_admin_sessions(10000).unwrap();
assert_eq!(deleted, 1);
assert!(store.get_admin_session("sess-expired").unwrap().is_none());
// Active session should not be deleted
assert!(store.get_admin_session("sess-admin-1").unwrap().is_some());
}
#[test]
fn admin_session_nullable_fields() {
let store = test_store();
store
.insert_admin_session(&NewAdminSession {
session_id: "sess-minimal".to_string(),
csrf_token: "csrf".to_string(),
admin_key_hash: "hash".to_string(),
created_at: 1000,
expires_at: 10000,
user_agent: None,
source_ip: None,
})
.unwrap();
let session = store.get_admin_session("sess-minimal").unwrap().unwrap();
assert!(session.user_agent.is_none());
assert!(session.source_ip.is_none());
}
// --- prune_tasks ---
#[test]
fn prune_tasks_deletes_old_terminal_tasks() {
let store = test_store();
// Insert tasks with different statuses and timestamps
for i in 0..10 {
store
.insert_task(&NewTask {
miroir_id: format!("task-{i}"),
created_at: i as i64 * 1000,
status: match i {
0..=2 => "succeeded",
3..=5 => "failed",
6..=7 => "canceled",
_ => "enqueued", // should NOT be pruned
}
.to_string(),
node_tasks: HashMap::new(),
error: None,
})
.unwrap();
}
// Prune tasks older than 3500ms (should delete tasks 0, 1, 2, 3)
let deleted = store.prune_tasks(3500, 100).unwrap();
assert_eq!(deleted, 4); // tasks 0, 1, 2, 3 (succeeded or failed, < 3500ms)
// Verify task-4 (failed at 4000ms) still exists
assert!(store.get_task("task-4").unwrap().is_some());
// Verify task-8 (enqueued) still exists regardless of age
assert!(store.get_task("task-8").unwrap().is_some());
}
}