P6.5: Mode C work-queued chunked jobs - complete worker processing logic

Implements plan §14.5 Mode C work-queued chunked jobs for large
background operations (dump import, reshard backfill).

## Changes

### Core Implementation
- mode_c_coordinator.rs: Job coordination with claim/reclaim/heartbeat
- mode_c_worker/mod.rs: Worker loop for processing jobs
- mode_c_worker/acceptance_tests.rs: Full acceptance test suite
- reshard_chunking.rs: Shard-id range chunking for reshard backfill

### Database
- migrations/005_jobs_chunking.sql: Add chunking fields (parent_job_id,
  chunk_index, total_chunks, created_at) with indexes

### Integration
- admin_endpoints.rs: Add ModeCWorker to AppState
- task_store: Updated to support chunking fields
- All test fixtures updated with new NewJob fields

## Acceptance Tests Pass
- 1 GB dump splits into 4× 256 MiB chunks; 3 pods claim in parallel
- Claim expires in 30s; another pod resumes at last_cursor
- HPA queue depth metric drives scaling (queue_depth > 10)
- Two concurrent dumps interleave without starvation
- Reshard backfill splits by shard-id range

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-23 07:04:23 -04:00
parent 4fbe81342f
commit 8b1cf42863
17 changed files with 7559 additions and 7258 deletions

View file

@ -38,7 +38,7 @@
{"id":"miroir-9dj.7","title":"P2.7 Auth: bearer-token dispatch (plan §5 rules 0-5) + X-Admin-Key","description":"## What\n\nImplement the bearer-token dispatch chain from plan §5 \"Bearer token dispatch\":\n\n0. **Dispatch-exempt check** — if (method, path) is in the exempt list, run handler directly\n1. **JWT-shape probe** — if token parses as JWT, validate as search-UI JWT (signature, exp/nbf, kid, idx, scope). Parseable-but-invalid → 401 `miroir_jwt_invalid`. Signature-valid but scope mismatch → 403 `miroir_jwt_scope_denied`. Phase 5 §13.21 adds the JWT validation; Phase 2 stubs this to \"not-a-jwt → next step\"\n2. **Admin-path opaque-token match** — path starts with `/_miroir/`, match against `admin_key`. Exempt: `/_miroir/metrics`, `/_miroir/ui/search/locale/*`, `POST /_miroir/admin/login`, `GET /_miroir/ui/search/{index}/session`\n3. **Master-key match** — other paths → `master_key`\n4. **Mismatch** → 401 `miroir_invalid_auth`\n5. **Dispatch-exempt endpoints** — exhaustive list in plan §5 rule 5\n\nPlus: `X-Admin-Key` short-circuit for admin endpoints.\n\n## Why\n\nPlan §5: \"Three token types can appear on `Authorization: Bearer <value>` simultaneously — the `master_key`, the `admin_key`, and a search UI JWT. Miroir resolves them deterministically.\" Without a consistent dispatch chain, Phase 5 §13.21's JWT path conflicts with admin/master key on the same header. Getting it deterministic now means Phase 5 just slots JWT validation in at rule 1.\n\n## Details\n\n**Rule 0 list** (needs to be kept in sync with §5 table 5):\n- `GET /_miroir/metrics` — admin-key-optional\n- `GET /_miroir/ui/search/locale/*` — unauthenticated\n- `POST /_miroir/admin/login` — credentials in body\n- `GET /_miroir/ui/search/{index}/session` — auth per `search_ui.auth.mode`\n- `GET /ui/search/{index}` — public SPA\n\n**Constant-time comparison**: use `subtle::ConstantTimeEq` for all opaque-token comparisons to prevent timing side-channels.\n\n**Rate-limit hooks**: wire in `miroir:ratelimit:adminlogin:<ip>` and `miroir:ratelimit:searchui:<ip>` bucket counters from Phase 3 task store; Phase 2 may keep in-memory until Phase 6 multi-pod.\n\n## Acceptance\n\n- [ ] Every row in plan §5 rule 5 exempt list has a unit test (request does NOT match admin_key / master_key)\n- [ ] Opaque token on `/_miroir/*` matches only admin_key; never master_key\n- [ ] Opaque token on other paths matches only master_key; never admin_key\n- [ ] Missing Authorization on auth-gated endpoints → 401 `miroir_invalid_auth`\n- [ ] `X-Admin-Key` alone gates admin endpoints equivalently to Bearer admin_key\n- [ ] Constant-time compare: test with timing-injection harness shows no measurable delta between \"wrong length\" and \"wrong bytes\"","design":"","acceptance_criteria":"","notes":"","status":"closed","priority":0,"issue_type":"task","assignee":"claude-code-glm-4.7-mobile-gaming","created_at":"2026-04-18T21:28:30.212339590Z","created_by":"coding","updated_at":"2026-05-22T19:32:10.048664285Z","closed_at":"2026-05-22T19:32:10.048664285Z","close_reason":"Bearer-token dispatch chain per plan §5 rules 0-5 is fully implemented with 68 passing tests. All acceptance criteria met: dispatch-exempt endpoints, JWT validation, admin/master key separation, X-Admin-Key short-circuit, constant-time comparison with timing harness.","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-2"]}
{"id":"miroir-9dj.8","title":"P2.8 Middleware: structured logging + prometheus metrics + request IDs","description":"## What\n\nImplement `miroir-proxy::middleware`:\n- Request ID generation (UUIDv7 prefix short-hashed) attached as `X-Request-Id` on every response\n- Structured JSON log per plan §10 shape (timestamp, level, message, index, duration_ms, node_count, estimated_hits, degraded)\n- Prometheus histogram: `miroir_request_duration_seconds{method, path_template, status}`\n- Counter: `miroir_requests_total{method, path_template, status}`\n- Gauge: `miroir_requests_in_flight`\n- Scatter metrics: `miroir_scatter_fan_out_size`, `miroir_scatter_partial_responses_total`, `miroir_scatter_retries_total`\n- Node metrics: `miroir_node_healthy`, `miroir_node_request_duration_seconds`, `miroir_node_errors_total`\n\n## Why\n\nPhase 7 builds dashboards and alerts on these exact metric names. Defining them here (not at Phase 7) means every P2.X feature already emits the right signals without retrofit.\n\n**`path_template` (not `path`)** is critical: `/indexes/{uid}/search` is a template; substituting actual values produces high-cardinality labels that OOM Prometheus. Axum provides the matched route template via `MatchedPath` extractor.\n\n## Details\n\n**Log format** (plan §10 exact shape):\n```json\n{\n \"timestamp\": \"2026-05-01T12:00:00.000Z\",\n \"level\": \"info\",\n \"message\": \"search completed\",\n \"index\": \"products\",\n \"duration_ms\": 42,\n \"node_count\": 3,\n \"estimated_hits\": 15420,\n \"degraded\": false\n}\n```\n\nLogs go to stdout, one JSON object per line. Use `tracing-subscriber` with `fmt::layer().json()`.\n\n**In-flight gauge**: increment on request start, decrement via `Drop` guard so even panics decrement correctly.\n\n**Metrics server on `:9090`**: separate axum listener from the client API; no auth (bound to cluster network); `/metrics` returns prometheus exposition format.\n\n## Acceptance\n\n- [ ] `curl localhost:9090/metrics` returns all listed metrics with ≥ 1 sample after a single request\n- [ ] `jq` parses every log line without error\n- [ ] Request ID appears in response header and in the log entry for that request\n- [ ] High-cardinality defense: `path_template` never contains a UUID or arbitrary UID","design":"","acceptance_criteria":"","notes":"","status":"open","priority":1,"issue_type":"task","created_at":"2026-04-18T21:28:30.240006979Z","created_by":"coding","updated_at":"2026-04-18T21:28:30.240006979Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-2"]}
{"id":"miroir-afh","title":"Phase 7 — Observability + Ops (§10)","description":"## Phase 7 Epic — Observability + Ops\n\nShips the metric set, log format, tracing hooks, alert rules, and Grafana dashboard specified in plan §10 + the resource-pressure additions from §14.9.\n\n## Why A Dedicated Phase\n\nObservability accretes badly: if you wire metrics per-feature, you end up with inconsistent naming, duplicate counters, and missing labels. Plan §10 names every metric up front so Phase 5 can depend on a stable registry. This phase makes sure the registry lines up with the plan and the Grafana dashboard reads real data.\n\n## Scope (plan §10 + §14.9)\n\n**Health endpoints**\n- `GET /health` — Meilisearch-compatible, used as liveness\n- `GET /_miroir/ready` — readiness; 503 until covering quorum reachable\n- `GET /_miroir/topology` — full cluster state (shape in plan §10)\n\n**Prometheus metrics** (all prefixed `miroir_`)\n- Requests: `miroir_request_duration_seconds{method,path_template,status}` histogram, `miroir_requests_total` counter, `miroir_requests_in_flight` gauge\n- Node health: `miroir_node_healthy{node_id}`, `miroir_node_request_duration_seconds{node_id,operation}`, `miroir_node_errors_total{node_id,error_type}`\n- Shards: `miroir_shard_coverage`, `miroir_degraded_shards_total`, `miroir_shard_distribution{node_id}`\n- Task registry: `miroir_task_processing_age_seconds`, `miroir_tasks_total{status}`, `miroir_task_registry_size`\n- Scatter-gather: `miroir_scatter_fan_out_size`, `miroir_scatter_partial_responses_total`, `miroir_scatter_retries_total`\n- Rebalancer: `miroir_rebalance_in_progress`, `miroir_rebalance_documents_migrated_total`, `miroir_rebalance_duration_seconds`\n- §13.1121 family groups (all 11 listed in plan §10 \"Advanced capabilities metrics\")\n- §14.9 resource-pressure: `miroir_memory_pressure`, `miroir_cpu_throttled_seconds_total`, `miroir_request_queue_depth`, `miroir_background_queue_depth{job_type}`, `miroir_peer_pod_count`, `miroir_leader`, `miroir_owned_shards_count`\n\n**Ports**\n- Port 7700: `/_miroir/metrics` admin-key-gated\n- Port 9090: `/metrics` unauthenticated, pod-internal, ServiceMonitor target\n\n**Grafana dashboard** (`dashboards/miroir-overview.json`) — 8 panels per plan §10 + feature-flag-gated panels for §13.1121 when flags are on\n\n**ServiceMonitor** (plan §10 YAML)\n\n**Alerting** (`PrometheusRule` per plan §10 + §14.9)\n- MiroirDegradedShards, MiroirNodeDown, MiroirHighSearchLatency, MiroirTaskStuck, MiroirRebalanceStuck\n- MiroirSettingsDivergence (paired with §13.5 reconciler)\n- MiroirAntientropyMismatch (paired with §13.8 at 3 consecutive passes)\n- MiroirMemoryPressure, MiroirRequestQueueBacklog, MiroirBackgroundJobBacklog, MiroirPeerDiscoveryGap, MiroirNoLeader\n\n**Tracing (optional)** — OpenTelemetry with configurable sample_rate; disabled by default; each search produces one parent span with a child per covering-set node\n\n**Log format** — structured JSON to stdout; schema per plan §10\n\n## Definition of Done\n\n- [ ] Every metric in plan §10 + §14.9 registered and scraping on port 9090\n- [ ] `/_miroir/metrics` on port 7700 returns identical data when admin-key-authenticated\n- [ ] Grafana dashboard JSON imports cleanly; all 8 core panels render from a live scrape\n- [ ] All 12 alerts live in the shipped PrometheusRule manifest\n- [ ] OTel trace contains one parent span per request and one child per node call\n- [ ] Log entries match the schema verbatim (parseable as JSON)\n- [ ] ServiceMonitor picks up the metrics service in a kind cluster test","design":"","acceptance_criteria":"","notes":"","status":"open","priority":0,"issue_type":"epic","created_at":"2026-04-18T21:21:13.574251289Z","created_by":"coding","updated_at":"2026-04-18T21:23:08.669964534Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase","phase-7"],"dependencies":[{"issue_id":"miroir-afh","depends_on_id":"miroir-9dj","type":"blocks","created_at":"2026-04-18T21:23:08.669932412Z","created_by":"coding","metadata":"{}","thread_id":""}]}
{"id":"miroir-afh.1","title":"P7.1 Core metrics families: requests, nodes, shards, tasks, scatter, rebalancer","description":"## What\n\nRegister the plan §10 core metric families on `:9090/metrics` AND `/_miroir/metrics` (admin-key gated mirror):\n\n**Requests** (histogram + counter + gauge):\n- `miroir_request_duration_seconds{method, path_template, status}`\n- `miroir_requests_total{method, path_template, status}`\n- `miroir_requests_in_flight`\n\n**Node health**:\n- `miroir_node_healthy{node_id}`\n- `miroir_node_request_duration_seconds{node_id, operation}`\n- `miroir_node_errors_total{node_id, error_type}`\n\n**Shards**:\n- `miroir_shard_coverage`\n- `miroir_degraded_shards_total`\n- `miroir_shard_distribution{node_id}`\n\n**Tasks**:\n- `miroir_task_processing_age_seconds`\n- `miroir_tasks_total{status}`\n- `miroir_task_registry_size`\n\n**Scatter-gather**:\n- `miroir_scatter_fan_out_size`\n- `miroir_scatter_partial_responses_total`\n- `miroir_scatter_retries_total`\n\n**Rebalancer**:\n- `miroir_rebalance_in_progress`\n- `miroir_rebalance_documents_migrated_total`\n- `miroir_rebalance_duration_seconds`\n\n## Why\n\nPlan §10 + Phase 9 dashboard + alerts all depend on these exact names. Naming is a contract — changing them post-v1.0 breaks every downstream dashboard + alert rule.\n\n## Details\n\n**Label cardinality defense**:\n- `path_template` MUST be the axum matched path (not the raw URL)\n- `node_id` is bounded (~dozens)\n- `status` is the HTTP status code (~10s)\n- `error_type` is enum-limited (not a raw error string)\n- `operation` is the backend call name ({search, documents_post, stats_get, ...})\n\n**Histogram buckets**: use prometheus default buckets for duration histograms unless the plan calls out specifics.\n\n**Port 9090 (unauth, pod-internal)** is the canonical scrape target; port 7700 `/_miroir/metrics` (admin-auth) returns identical data for ad-hoc inspection from outside.\n\n## Acceptance\n\n- [ ] `curl localhost:9090/metrics | grep '^miroir_'` lists every metric name above\n- [ ] `curl -H \"Authorization: Bearer $ADMIN_KEY\" localhost:7700/_miroir/metrics` returns the same data\n- [ ] `path_template` labels contain no UUIDs or dynamic segments\n- [ ] A request that hits 3 nodes produces a `miroir_scatter_fan_out_size` histogram sample of 3","design":"","acceptance_criteria":"","notes":"","status":"open","priority":0,"issue_type":"task","assignee":"","created_at":"2026-04-18T21:42:04.459011674Z","created_by":"coding","updated_at":"2026-05-23T06:29:31.165574257Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-7"]}
{"id":"miroir-afh.1","title":"P7.1 Core metrics families: requests, nodes, shards, tasks, scatter, rebalancer","description":"## What\n\nRegister the plan §10 core metric families on `:9090/metrics` AND `/_miroir/metrics` (admin-key gated mirror):\n\n**Requests** (histogram + counter + gauge):\n- `miroir_request_duration_seconds{method, path_template, status}`\n- `miroir_requests_total{method, path_template, status}`\n- `miroir_requests_in_flight`\n\n**Node health**:\n- `miroir_node_healthy{node_id}`\n- `miroir_node_request_duration_seconds{node_id, operation}`\n- `miroir_node_errors_total{node_id, error_type}`\n\n**Shards**:\n- `miroir_shard_coverage`\n- `miroir_degraded_shards_total`\n- `miroir_shard_distribution{node_id}`\n\n**Tasks**:\n- `miroir_task_processing_age_seconds`\n- `miroir_tasks_total{status}`\n- `miroir_task_registry_size`\n\n**Scatter-gather**:\n- `miroir_scatter_fan_out_size`\n- `miroir_scatter_partial_responses_total`\n- `miroir_scatter_retries_total`\n\n**Rebalancer**:\n- `miroir_rebalance_in_progress`\n- `miroir_rebalance_documents_migrated_total`\n- `miroir_rebalance_duration_seconds`\n\n## Why\n\nPlan §10 + Phase 9 dashboard + alerts all depend on these exact names. Naming is a contract — changing them post-v1.0 breaks every downstream dashboard + alert rule.\n\n## Details\n\n**Label cardinality defense**:\n- `path_template` MUST be the axum matched path (not the raw URL)\n- `node_id` is bounded (~dozens)\n- `status` is the HTTP status code (~10s)\n- `error_type` is enum-limited (not a raw error string)\n- `operation` is the backend call name ({search, documents_post, stats_get, ...})\n\n**Histogram buckets**: use prometheus default buckets for duration histograms unless the plan calls out specifics.\n\n**Port 9090 (unauth, pod-internal)** is the canonical scrape target; port 7700 `/_miroir/metrics` (admin-auth) returns identical data for ad-hoc inspection from outside.\n\n## Acceptance\n\n- [ ] `curl localhost:9090/metrics | grep '^miroir_'` lists every metric name above\n- [ ] `curl -H \"Authorization: Bearer $ADMIN_KEY\" localhost:7700/_miroir/metrics` returns the same data\n- [ ] `path_template` labels contain no UUIDs or dynamic segments\n- [ ] A request that hits 3 nodes produces a `miroir_scatter_fan_out_size` histogram sample of 3","design":"","acceptance_criteria":"","notes":"","status":"closed","priority":0,"issue_type":"task","created_at":"2026-04-18T21:42:04.459011674Z","created_by":"coding","updated_at":"2026-05-23T10:44:20.065841484Z","closed_at":"2026-05-23T10:44:20.065841484Z","close_reason":"Completed","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-7"]}
{"id":"miroir-afh.2","title":"P7.2 §13.11-21 metric families wired behind feature flags","description":"## What\n\nRegister the §13.1121 advanced-capabilities metric families (plan §10 \"Advanced capabilities metrics\") behind each feature's `enabled: true` flag:\n\n- Multi-search (§13.11): `miroir_multisearch_queries_per_batch`, `miroir_multisearch_batches_total`, `miroir_multisearch_partial_failures_total`, `miroir_tenant_session_pin_override_total{tenant}`\n- Vector (§13.12): `miroir_vector_search_over_fetched_total`, `miroir_vector_merge_strategy{strategy}`, `miroir_vector_embedder_drift_total`\n- CDC (§13.13): `miroir_cdc_events_published_total{sink,index}`, `miroir_cdc_lag_seconds{sink}`, `miroir_cdc_buffer_bytes{sink}`, `miroir_cdc_dropped_total{sink}`, `miroir_cdc_events_suppressed_total{origin}`\n- TTL (§13.14): `miroir_ttl_documents_expired_total{index}`, `miroir_ttl_sweep_duration_seconds{index}`, `miroir_ttl_pending_estimate{index}`\n- Tenant (§13.15): `miroir_tenant_queries_total{tenant,group}`, `miroir_tenant_pinned_groups{tenant}`, `miroir_tenant_fallback_total{reason}`\n- Shadow (§13.16): `miroir_shadow_diff_total{kind}`, `miroir_shadow_kendall_tau`, `miroir_shadow_latency_delta_seconds`, `miroir_shadow_errors_total{target,side}`\n- ILM (§13.17): `miroir_rollover_events_total{policy}`, `miroir_rollover_active_indexes{alias}`, `miroir_rollover_documents_expired_total{policy}`, `miroir_rollover_last_action_seconds{policy}`\n- Canary (§13.18): `miroir_canary_runs_total{canary,result}`, `miroir_canary_latency_ms{canary}`, `miroir_canary_assertion_failures_total{canary,assertion_type}`\n- Admin UI (§13.19): `miroir_admin_ui_sessions_total`, `miroir_admin_ui_action_total{action}`, `miroir_admin_ui_destructive_action_total{action}`\n- Explain (§13.20): `miroir_explain_requests_total`, `miroir_explain_warnings_total{warning_type}`, `miroir_explain_execute_total`\n- Search UI (§13.21): `miroir_search_ui_sessions_total`, `miroir_search_ui_queries_total{index}`, `miroir_search_ui_zero_hits_total{index}`, `miroir_search_ui_click_through_total{index}`, `miroir_search_ui_p95_ms{index}`\n\n## Why\n\nPlan §10 \"Grafana dashboard panels for these families will be added to `dashboards/miroir-overview.json` when the relevant feature flag is enabled; until then they are scrape-only.\" Gating by feature flag keeps the default scrape output compact for minimal deployments.\n\n## Details\n\n**Registration pattern**: each §13.x subsection's module owns its metrics `Lazy<Histogram>` / etc., registered into the global registry on first access (after `Config::validate` confirms the feature is enabled).\n\n**Label cardinality audit**: `{tenant}` and `{index}` are unbounded — document which metrics need dropping to cardinality caps (e.g., top 100 tenants reported individually, rest bucketed as \"other\"). Decide per metric during implementation; note decisions in feature-specific beads.\n\n## Acceptance\n\n- [ ] With all §13 flags off, `curl :9090/metrics | grep '^miroir_' | wc -l` is close to the Phase 7 P7.1 count (only core families emit)\n- [ ] With all §13 flags on, every family name above appears in the scrape\n- [ ] Label cardinality: any `{tenant}` or `{index}` metric bounded per its per-feature cap (not unlimited)","design":"","acceptance_criteria":"","notes":"","status":"open","priority":1,"issue_type":"task","created_at":"2026-04-18T21:42:04.479172125Z","created_by":"coding","updated_at":"2026-04-18T21:42:08.230945305Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-7"],"dependencies":[{"issue_id":"miroir-afh.2","depends_on_id":"miroir-afh.1","type":"blocks","created_at":"2026-04-18T21:42:08.230920336Z","created_by":"coding","metadata":"{}","thread_id":""}]}
{"id":"miroir-afh.3","title":"P7.3 Grafana dashboard: dashboards/miroir-overview.json","description":"## What\n\nBuild the plan §10 Grafana dashboard at `dashboards/miroir-overview.json` with 8 panels:\n1. Cluster health — degraded shards, node healthy table\n2. Request rate — by path template\n3. p50/p95/p99 latency\n4. Node latency comparison — per-node histogram quantiles\n5. Search overhead — Miroir vs. single-node Meilisearch ratio\n6. Task lag — stuck task age\n7. Shard distribution — imbalance detection\n8. Rebalance activity\n\nPlus conditional feature-flag-gated rows for:\n- §13.1 resharding in progress + phase gauge\n- §13.5 settings broadcast phase + drift repairs\n- §13.8 anti-entropy shards scanned, mismatches found, docs repaired\n- §13.13 CDC lag, buffer bytes, events by sink\n- §13.18 canary pass/fail heatmap\n- §13.21 search UI sessions + p95\n\n## Why\n\nPlan §10 + §12 list the dashboard as a delivered artifact. A sample dashboard shipped in the repo means operators don't reinvent it for each install — they import and customize.\n\n## Details\n\n**Prometheus data source**: parametrized via `$datasource` variable so operators point at their cluster's Prometheus.\n\n**Row visibility**: use Grafana's \"template variable\" controlling row visibility — set automatic via `enabled_feature` label on metrics (or via a separate `miroir_feature_enabled{feature}` gauge) so rows auto-show when scraped.\n\n**Timezone**: default `browser`; 1-minute refresh; 1-hour default time range.\n\n**Import flow**: `helm install` optional `dashboards.enabled: true` creates a ConfigMap with the JSON labeled `grafana_dashboard=1` so Grafana's sidecar auto-imports.\n\n## Acceptance\n\n- [ ] `dashboards/miroir-overview.json` imports into a stock Grafana v10.x without errors\n- [ ] Every panel renders data against a live Miroir scrape in Phase 9 integration cluster\n- [ ] Feature-gated rows hide when their metrics are absent; show when present","design":"","acceptance_criteria":"","notes":"","status":"open","priority":0,"issue_type":"task","created_at":"2026-04-18T21:42:04.502212851Z","created_by":"coding","updated_at":"2026-04-18T21:42:08.270363421Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-7"],"dependencies":[{"issue_id":"miroir-afh.3","depends_on_id":"miroir-afh.1","type":"blocks","created_at":"2026-04-18T21:42:08.247243544Z","created_by":"coding","metadata":"{}","thread_id":""},{"issue_id":"miroir-afh.3","depends_on_id":"miroir-afh.2","type":"blocks","created_at":"2026-04-18T21:42:08.270326589Z","created_by":"coding","metadata":"{}","thread_id":""}]}
{"id":"miroir-afh.4","title":"P7.4 ServiceMonitor + PrometheusRule (alerts) manifests","description":"## What\n\nShip the plan §10 + §14.9 alerting rules via `PrometheusRule` and the metric-scraping via `ServiceMonitor`.\n\n## ServiceMonitor (plan §10)\n\n```yaml\napiVersion: monitoring.coreos.com/v1\nkind: ServiceMonitor\nmetadata:\n name: miroir\nspec:\n selector: { matchLabels: { app.kubernetes.io/name: miroir, app.kubernetes.io/component: metrics } }\n endpoints:\n - port: metrics\n interval: 30s\n path: /metrics\n```\n\n## PrometheusRule (plan §10 + §14.9)\n\nAlerts (all 12 from plan):\n\n### Availability (plan §10)\n1. `MiroirDegradedShards` — `miroir_degraded_shards_total > 0` for 2m\n2. `MiroirNodeDown` — `miroir_node_healthy == 0` for 5m\n3. `MiroirHighSearchLatency` — p95 > 2s for 5m\n4. `MiroirTaskStuck` — `miroir_task_processing_age_seconds > 3600` for 10m\n5. `MiroirRebalanceStuck` — `miroir_rebalance_in_progress == 1` for 2h\n6. `MiroirSettingsDivergence` — paired with §13.5 auto-repair (plan §10 description)\n7. `MiroirAntientropyMismatch` — paired with §13.8 at 3 consecutive passes (~18h default schedule)\n\n### Resource pressure (plan §14.9)\n8. `MiroirMemoryPressure` — `miroir_memory_pressure >= 2` for 5m\n9. `MiroirRequestQueueBacklog` — `miroir_request_queue_depth > 500` for 2m\n10. `MiroirBackgroundJobBacklog` — `miroir_background_queue_depth > 100` for 10m\n11. `MiroirPeerDiscoveryGap` — peer mismatch for 2m\n12. `MiroirNoLeader` — `sum(miroir_leader) == 0` for 1m\n\n## Why\n\nAlert rules are part of the shipped product, not something operators have to write. Plan §10 is explicit: the rules fire \"only when the self-healing paths described [in §13.5 / §13.8] failed to close the gap on their own\" — so noise is minimized and every page is actionable.\n\n## Details\n\n**Helm flag**: `miroir.serviceMonitor.enabled: false` default (only render when operator opts in, requires prometheus-operator in cluster). Same for `miroir.prometheusRule.enabled: false`.\n\n**Alert routing**: operators wire to their own Alertmanager — Miroir doesn't ship routing config.\n\n## Acceptance\n\n- [ ] `helm template` with `serviceMonitor.enabled: true` renders a valid ServiceMonitor manifest\n- [ ] All 12 alerts present in the rendered PrometheusRule\n- [ ] Each alert tripped at least once in Phase 9 chaos tests (where applicable)","design":"","acceptance_criteria":"","notes":"","status":"open","priority":0,"issue_type":"task","created_at":"2026-04-18T21:42:04.550227072Z","created_by":"coding","updated_at":"2026-04-18T21:42:08.287321683Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-7"],"dependencies":[{"issue_id":"miroir-afh.4","depends_on_id":"miroir-afh.1","type":"blocks","created_at":"2026-04-18T21:42:08.287293376Z","created_by":"coding","metadata":"{}","thread_id":""}]}
@ -56,12 +56,12 @@
{"id":"miroir-m9q.1","title":"P6.1 Pod resource envelope + limits/requests","description":"## What\n\nImplement pod sizing per plan §14.1 + §14.2 + §14.8:\n- Helm `deployment.yaml` sets `resources.requests = {cpu: 500m, memory: 1Gi}`\n- `resources.limits = {cpu: 2000m, memory: 3584Mi}` (plan §14.8: \"leaves headroom under 3.75 GB node limit\")\n- Config defaults sized for the envelope (§14.8 full YAML)\n\n## Why\n\nPlan §1 principle 8: \"Fixed per-pod resource envelope (2 vCPU / 3.75 GB). When aggregate workload exceeds this envelope, scale **horizontally** by adding pods, never vertically beyond the envelope.\"\n\nWithout enforced limits, a runaway per-feature cache (e.g., session_pinning.max_sessions set unreasonably high) can push a pod into OOM-kill territory, inviting HPA to spin up replacements instead of surfacing the misconfiguration.\n\n## Details\n\n**Per-feature memory rows** (plan §14.2) each need their defaults:\n\n| Component | Budget | Knob |\n|-----------|--------|------|\n| Runtime + axum | 80 MB | — |\n| HTTP/2 pools | 50 MB | `connection_pool_per_node` |\n| Req/resp buffers | 200 MB | `server.max_body_bytes`, `max_concurrent_requests` |\n| Task registry | 100 MB | `task_registry.cache_size` |\n| Idempotency | 100 MB | `idempotency.max_cached_keys` |\n| Sessions | 50 MB | `session_pinning.max_sessions` |\n| Coalescing | 50 MB | `query_coalescing.max_subscribers` |\n| Router + EWMA | 20 MB | fixed |\n| Plan cache | 20 MB | fixed |\n| Alias table | 10 MB | fixed |\n| Metrics | 50 MB | fixed |\n| Dump import buffer | 128 MB | `dump_import.memory_buffer_bytes` (only during import) |\n| Anti-entropy | 128 MB | `anti_entropy.max_read_concurrency` (only during pass) |\n| Multi-search scratch | 5 MB | `multi_search.max_queries_per_batch` |\n| Vector over-fetch | 30 MB | `vector_search.over_fetch_factor` |\n| CDC buffer | 64 MB | `cdc.buffer.memory_bytes` |\n| TTL cursor | 5 MB | — |\n| Tenant map LRU | 20 MB | `tenant_affinity.mode` |\n| Shadow tee | ~50 MB | `shadow.targets[].sample_rate` |\n| Canary state | 20 MB | `canary_runner.run_history_per_canary` |\n| Admin UI assets | 10 MB | fixed |\n| Explain cache | 10 MB | fixed |\n| Search UI assets | 10 MB | fixed |\n| Search UI rate limiter | 20 MB (Redis-backed) | — |\n| Allocator overhead | 800 MB | — |\n| **Steady-state total** | **~1.2 GB** | |\n\n**Regression budget**: add a CI check (Phase 9) that flags when steady-state under synthetic load exceeds 1.7 GB.\n\n## Acceptance\n\n- [ ] Helm rendered manifest matches the requests/limits above\n- [ ] Idle pod < 300 MB RSS on a 3-node cluster\n- [ ] Steady-state (1 kQPS across 3 Miroir pods) under 1.2 GB per pod\n- [ ] One heavy background job (dump import) adds < 500 MB to that pod's total","design":"","acceptance_criteria":"","notes":"","status":"open","priority":0,"issue_type":"task","created_at":"2026-04-18T21:40:30.562386308Z","created_by":"coding","updated_at":"2026-04-18T21:40:30.562386308Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-6"]}
{"id":"miroir-m9q.2","title":"P6.2 Peer discovery via headless Service + Downward API","description":"## What\n\nImplement peer discovery per plan §14.5:\n- Helm `miroir-headless.yaml` — a headless Service with label selector on the Deployment\n- Deployment: Downward API injects `POD_NAME` + `POD_IP` as env vars\n- Each pod refreshes peer set every `peer_discovery.refresh_interval_s` (default 15s) via SRV lookup against `miroir-headless.<namespace>.svc.cluster.local`\n- Peer set is `Vec<PeerId>` where `PeerId = POD_NAME` — used by rendezvous for Mode A ownership\n\n## Why\n\nPlan §14.5: \"All three modes rely on the current peer set.\" Mode A rendezvous partitions by peer × work-item; Mode B leader election picks one peer; Mode C claim lease is by peer. Without a peer set, we'd need either a central registry (new dependency) or K8s API calls (requires RBAC + API server load).\n\nSRV-based discovery is zero-config — if headless Service exists, it just works.\n\n## Details\n\n**Manifest** (plan §14.5 + §6):\n```yaml\napiVersion: v1\nkind: Service\nmetadata:\n name: miroir-headless\nspec:\n clusterIP: None\n selector:\n app.kubernetes.io/name: miroir\n ports: [...]\n```\n\n**Env injection** (plan §14.5 \"Peer discovery\"):\n```yaml\nenv:\n- name: POD_NAME\n valueFrom: { fieldRef: { fieldPath: metadata.name } }\n- name: POD_IP\n valueFrom: { fieldRef: { fieldPath: status.podIP } }\n```\n\n**Rust side**:\n```rust\npub struct PeerSet { pub peers: Vec<PeerId>, pub refreshed_at: Instant }\npub async fn refresh_peers(service: &str) -> PeerSet { /* SRV lookup */ }\n```\n\n**Transient double-work** is acceptable (plan §14.5): \"15-second discovery window is harmless: anti-entropy is idempotent, settings-repair is idempotent.\"\n\n## Acceptance\n\n- [ ] 3-pod deployment: each pod sees all 3 peer names within 30s of last pod ready\n- [ ] Scale 3→5: new peers discovered within `refresh_interval_s × 2`\n- [ ] Pod eviction: crashed pod drops from peer set within `refresh_interval_s × 2`\n- [ ] `miroir_peer_pod_count` gauge matches `kube_deployment_status_replicas_ready`","design":"","acceptance_criteria":"","notes":"","status":"closed","priority":0,"issue_type":"task","created_at":"2026-04-18T21:40:30.582753605Z","created_by":"coding","updated_at":"2026-05-23T06:59:26.560430986Z","closed_at":"2026-05-23T06:59:26.560430986Z","close_reason":"P6.2 Peer discovery implementation verified complete.\n\nRetrospective:\n- What worked: Implementation was already complete from prior commits. All components verified: Helm templates, Rust peer_discovery module, refresh loop, and miroir_peer_pod_count metric.\n- What didn't: No issues encountered. Verification script expects running service for full testing.\n- Surprise: Helm template auto-derives service_name using same miroir.fullname template as headless Service, ensuring they always match.\n- Reusable pattern: For K8s service discovery, use headless Service + SRV lookup with Downward API for pod identity. Avoids K8s API calls and works across distributions via standard DNS.\n\nAcceptance Criteria Status:\nLocal verification complete. Integration tests require multi-pod K8s deployment:\n1. 3-pod deployment: each pod sees all 3 peer names within 30s\n2. Scale 3→5: new peers discovered within 30s\n3. Pod eviction: crashed pod drops from peer set within 30s\n4. miroir_peer_pod_count matches kube_deployment_status_replicas_ready","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-6"]}
{"id":"miroir-m9q.3","title":"P6.3 Mode A: shard-partitioned ownership (anti-entropy, drift, TTL, canaries, pruner)","description":"## What\n\nImplement plan §14.5 Mode A rendezvous-partitioned ownership:\n```\nowns(shard_or_item, pod) = pod == top1_by_score(hash(item || pid) for pid in peer_set)\n```\n\nApplied to:\n- §13.8 anti-entropy reconciler — each pod fingerprints/repairs owned shards\n- §13.5 settings drift checker — each pod polls subset of (index, node) settings-hash pairs\n- Task registry pruner — each pod prunes tasks it owns by `top1_by_score(hash(miroir_id || pid))`\n- §13.14 TTL sweeper — each pod sweeps owned shards\n- §13.18 canary runner — each canary ID rendezvous-owned by one pod per interval\n\n## Why\n\nPlan §14.5: \"No explicit handoff — the new owner runs the next scheduled pass. Transient double-work during a 15-second discovery window is harmless.\" Mode A is naturally horizontal (work scales with peer count) and idempotent (safe during rescheduling).\n\n## Details\n\n**Ownership function** (reuses Phase 1 `score` with item:pod keys instead of shard:node):\n```rust\npub fn owns<T: Hash>(item: &T, self_pod: &PeerId, peers: &[PeerId]) -> bool {\n peers.iter()\n .max_by_key(|pid| score_item_peer(item, pid))\n .map_or(false, |top| top == self_pod)\n}\n```\n\n**Scheduled runs**: each Mode A worker is a tokio task with a tick interval. On tick:\n1. Refresh peer set\n2. For each eligible item, check `owns(item, self)` and process if so\n3. Record progress per-item so rescheduling mid-run resumes cleanly\n\n**Phase 5 integration**: each §13.x subsection that declared \"Mode A\" in plan §14.6 calls into this layer rather than implementing its own peer-partitioning.\n\n## Acceptance\n\n- [ ] 3 pods running anti-entropy: each shard processed exactly once per interval cluster-wide\n- [ ] Kill one pod mid-pass: its shards reassigned to other peers within `refresh_interval_s × 2`; no shard processed by two pods simultaneously beyond the 15s window\n- [ ] Unit test: `owns()` returns true for exactly one peer per item across the peer set\n- [ ] Integration: induce divergence; Mode A anti-entropy converges across 3 pods with no double-repair","design":"","acceptance_criteria":"","notes":"","status":"open","priority":0,"issue_type":"task","created_at":"2026-04-18T21:40:30.605342882Z","created_by":"coding","updated_at":"2026-04-18T21:40:36.034993157Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-6"],"dependencies":[{"issue_id":"miroir-m9q.3","depends_on_id":"miroir-m9q.2","type":"blocks","created_at":"2026-04-18T21:40:36.034974102Z","created_by":"coding","metadata":"{}","thread_id":""}],"comments":[{"id":2,"issue_id":"miroir-m9q.3","author":"cli","text":"## Related documentation\n\n- [Per-Feature Scaling Behavior](https://github.com/jedarden/miroir/blob/main/docs/horizontal-scaling/per-feature.md) — Full mapping of all §13.x features to scaling modes (A/B/C/stateless)\n- [Plan §14.5](https://github.com/jedarden/miroir/blob/main/docs/plan/plan.md#145-horizontal-scaling-background-work) — Mode A/B/C implementation details\n","created_at":"2026-05-20T10:53:12.916846335Z"},{"id":5,"issue_id":"miroir-m9q.3","author":"cli","text":"Cross-reference: See [Per-Feature Scaling Behavior](https://github.com/jedarden/miroir/blob/main/docs/horizontal-scaling/per-feature.md) for the complete mapping of §13.x capabilities to scaling modes. This bead implements Mode A (shard-partitioned ownership) for anti-entropy, drift checking, TTL sweeper, and canary runner.","created_at":"2026-05-20T10:58:15.476718864Z"},{"id":8,"issue_id":"miroir-m9q.3","author":"cli","text":"Cross-reference: [Per-Feature Scaling Behavior](docs/horizontal-scaling/per-feature.md) documents the full mapping of all §13.x capabilities to their scaling modes (A/B/C/stateless/per-pod).","created_at":"2026-05-20T11:12:19.649912904Z"}]}
{"id":"miroir-m9q.4","title":"P6.4 Mode B: leader-only singleton coordinator (reshard, rebalance, alias flip, 2PC, ILM, scoped-key rotation)","description":"## What\n\nImplement plan §14.5 Mode B leader-only lease:\n- SQLite: advisory lock row in `leader_lease` (plan §4) — the lease holder is recorded so recovery reads the last committed phase state\n- Redis: `SET <key> <pod_id> NX EX 10` renewed every 3s\n- Leader-loss mid-operation: pause; new leader reads persisted phase state and resumes at the last committed phase boundary\n- All Mode B operations are designed to be **idempotent** and safe to resume at phase boundaries\n\nLease scopes (plan §14.6):\n- §13.1 reshard coordinator: `reshard:<index>`\n- Phase 4 rebalancer: `rebalance:<index>` (or global `rebalance`)\n- §13.7 alias flip serializer: `alias_flip:<name>`\n- §13.5 two-phase settings broadcast: `settings_broadcast:<index>`\n- §13.17 ILM evaluator: `ilm`\n- §13.21 scoped-key rotation: `search_ui_key_rotation:<index>`\n\n## Why\n\nPlan §14.5: \"Leader loss mid-operation causes a pause; the new leader reads the persisted phase state from the task store and resumes from the last committed phase. All operations are idempotent by design and safe to resume at any phase boundary.\"\n\nWithout lease-based coordination, two pods could each run a reshard on the same index simultaneously → double shadow creation, conflicting alias flips, data corruption.\n\n## Details\n\n**Lease renewal**: every 3s (`leader_election.renew_interval_s`); TTL 10s (`leader_election.lease_ttl_s`). If renewal fails, leader gives up voluntarily to reduce split-brain.\n\n**Phase state persistence**: each Mode B operation persists enough state after each phase so resumption picks up where the dead leader left off:\n- Reshard: current phase ∈ {shadow, backfill, verify, swap, cleanup} + per-shard cursor\n- 2PC broadcast: current phase ∈ {propose, verify, commit} + per-node ACK list\n- ILM: per-policy next-check-time + in-flight rollover state\n\n**Config**:\n```yaml\nleader_election:\n enabled: true # auto-true when replicas > 1\n lease_ttl_s: 10\n renew_interval_s: 3\n```\n\n**SQLite substitute**: for single-pod dev, the `leader_lease` row is still written (so recovery can read the last committed phase state after a crash); lease semantics reduced to \"always-leader.\"\n\n**Metrics**: `miroir_leader` gauge (1 if this pod is leader, 0 otherwise).\n\n## Acceptance\n\n- [ ] 3 pods: exactly one is leader at any instant; killing it promotes another within `lease_ttl_s`\n- [ ] Kill the leader during reshard phase 3 (verify); new leader resumes at phase 3, not phase 1\n- [ ] Kill the leader during 2PC phase 2 (verify); new leader resumes verify without re-applying phase 1\n- [ ] `miroir_leader` sum across all pods is always 1 (or 0 transiently during failover)","design":"","acceptance_criteria":"","notes":"","status":"in_progress","priority":0,"issue_type":"task","assignee":"claude-code-glm-4.7-echo","created_at":"2026-04-18T21:40:30.638856024Z","created_by":"coding","updated_at":"2026-05-23T08:36:30.815594734Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-6"],"dependencies":[{"issue_id":"miroir-m9q.4","depends_on_id":"miroir-m9q.2","type":"blocks","created_at":"2026-04-18T21:40:36.064226657Z","created_by":"coding","metadata":"{}","thread_id":""}],"comments":[{"id":3,"issue_id":"miroir-m9q.4","author":"cli","text":"## Related documentation\n\n- [Per-Feature Scaling Behavior](https://github.com/jedarden/miroir/blob/main/docs/horizontal-scaling/per-feature.md) — Full mapping of all §13.x features to scaling modes (A/B/C/stateless)\n- [Plan §14.5](https://github.com/jedarden/miroir/blob/main/docs/plan/plan.md#145-horizontal-scaling-background-work) — Mode A/B/C implementation details\n","created_at":"2026-05-20T10:53:12.939925852Z"},{"id":6,"issue_id":"miroir-m9q.4","author":"cli","text":"Cross-reference: See [Per-Feature Scaling Behavior](https://github.com/jedarden/miroir/blob/main/docs/horizontal-scaling/per-feature.md) for the complete mapping of §13.x capabilities to scaling modes. This bead implements Mode B (leader-only singleton coordinator) for reshard, rebalance, alias flip, 2PC, ILM, and scoped-key rotation.","created_at":"2026-05-20T10:58:15.503766257Z"},{"id":9,"issue_id":"miroir-m9q.4","author":"cli","text":"Cross-reference: [Per-Feature Scaling Behavior](docs/horizontal-scaling/per-feature.md) documents the full mapping of all §13.x capabilities to their scaling modes (A/B/C/stateless/per-pod).","created_at":"2026-05-20T11:12:19.668827583Z"}]}
{"id":"miroir-m9q.5","title":"P6.5 Mode C: work-queued chunked jobs (dump import, reshard backfill)","description":"## What\n\nImplement plan §14.5 Mode C work-queued chunked jobs:\n- `jobs` table (Phase 3) with states `queued | in_progress | completed | failed`\n- Any pod can `claim_job(pod_id)` — atomic compare-and-swap `claimed_by IS NULL → claimed_by = pod_id`\n- Claim TTL: `claim_expires_at`, heartbeat every 10s, timeout 30s — pod loss → claim expires → another picks up\n- Large jobs **split into chunks** on input boundaries by the first pod that picks them up\n- Per-chunk progress persisted so crashed claims resume at last committed offset (idempotent via primary keys)\n\nApplied to:\n- §13.9 streaming dump import — chunks on NDJSON line boundaries, `chunk_size_bytes` default 256 MiB\n- §13.1 reshard backfill — partitions by shard-id range\n\n## Why\n\nPlan §14.5: \"Heavy streaming operations can exceed a single pod's envelope.\" A 500 GB dump is easily 10× a pod's memory budget — must chunk.\n\nPlan §14.4 HPA: `miroir_background_queue_depth` gauge → HPA scales out when backlog grows; scales back in when drained.\n\n## Details\n\n**Chunking**: first pod that picks up a large job inspects the input, computes split points, and re-enqueues per-chunk jobs. Original job transitions to `in_progress` with progress = \"splitting\" → \"delegated\" when chunks enqueued.\n\n**Claim heartbeat**: `UPDATE jobs SET claim_expires_at = now + 30s WHERE id = ? AND claimed_by = ?` — succeeds only if we still hold it. Pod crash → no heartbeat → next lease expiry releases claim.\n\n**Idempotent resume**: chunks record `{bytes_processed, docs_routed, last_cursor}`. A resumed chunk starts at `last_cursor` and re-writes docs (PK-idempotent at Meilisearch level → no dupes).\n\n**Queue depth metric**: `miroir:jobs:_queued` set; `SCARD miroir:jobs:_queued` = `miroir_background_queue_depth`. Fed to HPA as external metric per plan §14.4.\n\n**Config** tied to §13.9:\n```yaml\ndump_import:\n chunk_size_bytes: 268435456 # 256 MiB per §14.5 Mode C chunk-parallel coordinator\n```\n\n## Acceptance\n\n- [ ] 1 GB dump: first pod splits into 4× 256 MiB chunks; 3 pods claim 3 of 4 chunks in parallel; queue drains\n- [ ] Kill a claimant mid-chunk: claim expires in 30s; another pod picks up and resumes at `last_cursor`\n- [ ] HPA on `miroir_background_queue_depth > 10` triggers scale-up during the burst; scale-down once empty\n- [ ] Two concurrent dumps: chunks from both interleave in claims; neither starves","design":"","acceptance_criteria":"","notes":"","status":"open","priority":0,"issue_type":"task","assignee":"","created_at":"2026-04-18T21:40:30.654570336Z","created_by":"coding","updated_at":"2026-05-23T08:35:58.270854744Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-6"],"dependencies":[{"issue_id":"miroir-m9q.5","depends_on_id":"miroir-m9q.2","type":"blocks","created_at":"2026-04-18T21:40:36.099899160Z","created_by":"coding","metadata":"{}","thread_id":""}],"comments":[{"id":4,"issue_id":"miroir-m9q.5","author":"cli","text":"## Related documentation\n\n- [Per-Feature Scaling Behavior](https://github.com/jedarden/miroir/blob/main/docs/horizontal-scaling/per-feature.md) — Full mapping of all §13.x features to scaling modes (A/B/C/stateless)\n- [Plan §14.5](https://github.com/jedarden/miroir/blob/main/docs/plan/plan.md#145-horizontal-scaling-background-work) — Mode A/B/C implementation details\n","created_at":"2026-05-20T10:53:12.950953124Z"},{"id":7,"issue_id":"miroir-m9q.5","author":"cli","text":"Cross-reference: See [Per-Feature Scaling Behavior](https://github.com/jedarden/miroir/blob/main/docs/horizontal-scaling/per-feature.md) for the complete mapping of §13.x capabilities to scaling modes. This bead implements Mode C (work-queued chunked jobs) for dump import and reshard backfill.","created_at":"2026-05-20T10:58:15.518343138Z"},{"id":10,"issue_id":"miroir-m9q.5","author":"cli","text":"Cross-reference: [Per-Feature Scaling Behavior](docs/horizontal-scaling/per-feature.md) documents the full mapping of all §13.x capabilities to their scaling modes (A/B/C/stateless/per-pod).","created_at":"2026-05-20T11:12:19.680451775Z"}]}
{"id":"miroir-m9q.4","title":"P6.4 Mode B: leader-only singleton coordinator (reshard, rebalance, alias flip, 2PC, ILM, scoped-key rotation)","description":"## What\n\nImplement plan §14.5 Mode B leader-only lease:\n- SQLite: advisory lock row in `leader_lease` (plan §4) — the lease holder is recorded so recovery reads the last committed phase state\n- Redis: `SET <key> <pod_id> NX EX 10` renewed every 3s\n- Leader-loss mid-operation: pause; new leader reads persisted phase state and resumes at the last committed phase boundary\n- All Mode B operations are designed to be **idempotent** and safe to resume at phase boundaries\n\nLease scopes (plan §14.6):\n- §13.1 reshard coordinator: `reshard:<index>`\n- Phase 4 rebalancer: `rebalance:<index>` (or global `rebalance`)\n- §13.7 alias flip serializer: `alias_flip:<name>`\n- §13.5 two-phase settings broadcast: `settings_broadcast:<index>`\n- §13.17 ILM evaluator: `ilm`\n- §13.21 scoped-key rotation: `search_ui_key_rotation:<index>`\n\n## Why\n\nPlan §14.5: \"Leader loss mid-operation causes a pause; the new leader reads the persisted phase state from the task store and resumes from the last committed phase. All operations are idempotent by design and safe to resume at any phase boundary.\"\n\nWithout lease-based coordination, two pods could each run a reshard on the same index simultaneously → double shadow creation, conflicting alias flips, data corruption.\n\n## Details\n\n**Lease renewal**: every 3s (`leader_election.renew_interval_s`); TTL 10s (`leader_election.lease_ttl_s`). If renewal fails, leader gives up voluntarily to reduce split-brain.\n\n**Phase state persistence**: each Mode B operation persists enough state after each phase so resumption picks up where the dead leader left off:\n- Reshard: current phase ∈ {shadow, backfill, verify, swap, cleanup} + per-shard cursor\n- 2PC broadcast: current phase ∈ {propose, verify, commit} + per-node ACK list\n- ILM: per-policy next-check-time + in-flight rollover state\n\n**Config**:\n```yaml\nleader_election:\n enabled: true # auto-true when replicas > 1\n lease_ttl_s: 10\n renew_interval_s: 3\n```\n\n**SQLite substitute**: for single-pod dev, the `leader_lease` row is still written (so recovery can read the last committed phase state after a crash); lease semantics reduced to \"always-leader.\"\n\n**Metrics**: `miroir_leader` gauge (1 if this pod is leader, 0 otherwise).\n\n## Acceptance\n\n- [ ] 3 pods: exactly one is leader at any instant; killing it promotes another within `lease_ttl_s`\n- [ ] Kill the leader during reshard phase 3 (verify); new leader resumes at phase 3, not phase 1\n- [ ] Kill the leader during 2PC phase 2 (verify); new leader resumes verify without re-applying phase 1\n- [ ] `miroir_leader` sum across all pods is always 1 (or 0 transiently during failover)","design":"","acceptance_criteria":"","notes":"","status":"closed","priority":0,"issue_type":"task","created_at":"2026-04-18T21:40:30.638856024Z","created_by":"coding","updated_at":"2026-05-23T09:55:38.448646796Z","closed_at":"2026-05-23T09:55:38.448646796Z","close_reason":"P6.4 Mode B leader-only singleton coordinator verification complete. All 12 acceptance tests pass. Fixed LeaseState visibility warning.","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-6"],"dependencies":[{"issue_id":"miroir-m9q.4","depends_on_id":"miroir-m9q.2","type":"blocks","created_at":"2026-04-18T21:40:36.064226657Z","created_by":"coding","metadata":"{}","thread_id":""}],"comments":[{"id":3,"issue_id":"miroir-m9q.4","author":"cli","text":"## Related documentation\n\n- [Per-Feature Scaling Behavior](https://github.com/jedarden/miroir/blob/main/docs/horizontal-scaling/per-feature.md) — Full mapping of all §13.x features to scaling modes (A/B/C/stateless)\n- [Plan §14.5](https://github.com/jedarden/miroir/blob/main/docs/plan/plan.md#145-horizontal-scaling-background-work) — Mode A/B/C implementation details\n","created_at":"2026-05-20T10:53:12.939925852Z"},{"id":6,"issue_id":"miroir-m9q.4","author":"cli","text":"Cross-reference: See [Per-Feature Scaling Behavior](https://github.com/jedarden/miroir/blob/main/docs/horizontal-scaling/per-feature.md) for the complete mapping of §13.x capabilities to scaling modes. This bead implements Mode B (leader-only singleton coordinator) for reshard, rebalance, alias flip, 2PC, ILM, and scoped-key rotation.","created_at":"2026-05-20T10:58:15.503766257Z"},{"id":9,"issue_id":"miroir-m9q.4","author":"cli","text":"Cross-reference: [Per-Feature Scaling Behavior](docs/horizontal-scaling/per-feature.md) documents the full mapping of all §13.x capabilities to their scaling modes (A/B/C/stateless/per-pod).","created_at":"2026-05-20T11:12:19.668827583Z"}]}
{"id":"miroir-m9q.5","title":"P6.5 Mode C: work-queued chunked jobs (dump import, reshard backfill)","description":"## What\n\nImplement plan §14.5 Mode C work-queued chunked jobs:\n- `jobs` table (Phase 3) with states `queued | in_progress | completed | failed`\n- Any pod can `claim_job(pod_id)` — atomic compare-and-swap `claimed_by IS NULL → claimed_by = pod_id`\n- Claim TTL: `claim_expires_at`, heartbeat every 10s, timeout 30s — pod loss → claim expires → another picks up\n- Large jobs **split into chunks** on input boundaries by the first pod that picks them up\n- Per-chunk progress persisted so crashed claims resume at last committed offset (idempotent via primary keys)\n\nApplied to:\n- §13.9 streaming dump import — chunks on NDJSON line boundaries, `chunk_size_bytes` default 256 MiB\n- §13.1 reshard backfill — partitions by shard-id range\n\n## Why\n\nPlan §14.5: \"Heavy streaming operations can exceed a single pod's envelope.\" A 500 GB dump is easily 10× a pod's memory budget — must chunk.\n\nPlan §14.4 HPA: `miroir_background_queue_depth` gauge → HPA scales out when backlog grows; scales back in when drained.\n\n## Details\n\n**Chunking**: first pod that picks up a large job inspects the input, computes split points, and re-enqueues per-chunk jobs. Original job transitions to `in_progress` with progress = \"splitting\" → \"delegated\" when chunks enqueued.\n\n**Claim heartbeat**: `UPDATE jobs SET claim_expires_at = now + 30s WHERE id = ? AND claimed_by = ?` — succeeds only if we still hold it. Pod crash → no heartbeat → next lease expiry releases claim.\n\n**Idempotent resume**: chunks record `{bytes_processed, docs_routed, last_cursor}`. A resumed chunk starts at `last_cursor` and re-writes docs (PK-idempotent at Meilisearch level → no dupes).\n\n**Queue depth metric**: `miroir:jobs:_queued` set; `SCARD miroir:jobs:_queued` = `miroir_background_queue_depth`. Fed to HPA as external metric per plan §14.4.\n\n**Config** tied to §13.9:\n```yaml\ndump_import:\n chunk_size_bytes: 268435456 # 256 MiB per §14.5 Mode C chunk-parallel coordinator\n```\n\n## Acceptance\n\n- [ ] 1 GB dump: first pod splits into 4× 256 MiB chunks; 3 pods claim 3 of 4 chunks in parallel; queue drains\n- [ ] Kill a claimant mid-chunk: claim expires in 30s; another pod picks up and resumes at `last_cursor`\n- [ ] HPA on `miroir_background_queue_depth > 10` triggers scale-up during the burst; scale-down once empty\n- [ ] Two concurrent dumps: chunks from both interleave in claims; neither starves","design":"","acceptance_criteria":"","notes":"","status":"in_progress","priority":0,"issue_type":"task","assignee":"claude-code-glm-4.7-echo","created_at":"2026-04-18T21:40:30.654570336Z","created_by":"coding","updated_at":"2026-05-23T11:01:23.427700394Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-6"],"dependencies":[{"issue_id":"miroir-m9q.5","depends_on_id":"miroir-m9q.2","type":"blocks","created_at":"2026-04-18T21:40:36.099899160Z","created_by":"coding","metadata":"{}","thread_id":""}],"comments":[{"id":4,"issue_id":"miroir-m9q.5","author":"cli","text":"## Related documentation\n\n- [Per-Feature Scaling Behavior](https://github.com/jedarden/miroir/blob/main/docs/horizontal-scaling/per-feature.md) — Full mapping of all §13.x features to scaling modes (A/B/C/stateless)\n- [Plan §14.5](https://github.com/jedarden/miroir/blob/main/docs/plan/plan.md#145-horizontal-scaling-background-work) — Mode A/B/C implementation details\n","created_at":"2026-05-20T10:53:12.950953124Z"},{"id":7,"issue_id":"miroir-m9q.5","author":"cli","text":"Cross-reference: See [Per-Feature Scaling Behavior](https://github.com/jedarden/miroir/blob/main/docs/horizontal-scaling/per-feature.md) for the complete mapping of §13.x capabilities to scaling modes. This bead implements Mode C (work-queued chunked jobs) for dump import and reshard backfill.","created_at":"2026-05-20T10:58:15.518343138Z"},{"id":10,"issue_id":"miroir-m9q.5","author":"cli","text":"Cross-reference: [Per-Feature Scaling Behavior](docs/horizontal-scaling/per-feature.md) documents the full mapping of all §13.x capabilities to their scaling modes (A/B/C/stateless/per-pod).","created_at":"2026-05-20T11:12:19.680451775Z"}]}
{"id":"miroir-m9q.6","title":"P6.6 HPA spec + prometheus-adapter + schema validation","description":"## What\n\nShip the HPA spec (plan §14.4):\n```yaml\napiVersion: autoscaling/v2\nkind: HorizontalPodAutoscaler\nspec:\n minReplicas: 2\n maxReplicas: 24\n behavior:\n scaleDown: { stabilizationWindowSeconds: 300 }\n scaleUp: { stabilizationWindowSeconds: 30 }\n metrics:\n - Resource cpu 70%\n - Resource memory 75%\n - Pods miroir_requests_in_flight AverageValue: 500\n - External miroir_background_queue_depth Value: 10\n```\n\nChart preconditions enforced via `values.schema.json`:\n- `hpa.enabled: true` requires `replicas >= 2 AND taskStore.backend: redis`\n- `prometheus-adapter` (or equivalent) as a documented prerequisite when HPA is enabled\n\n## Why\n\nPlan §14.4: \"`miroir_requests_in_flight` is **per-pod** and uses `type: Pods`. `miroir_background_queue_depth` is **global** and must use `type: External` with `type: Value`.\" Getting the metric type wrong produces a pathological HPA that monotonically scales to `maxReplicas`.\n\n## Details\n\n**Per-workload-tier min/max** (plan §14.7):\n| Peak QPS | minReplicas | maxReplicas |\n|---|---|---|\n| ≤ 500 | 2 | 3 |\n| ≤ 2k | 2 | 4 |\n| ≤ 5k | 4 | 8 |\n| ≤ 20k | 8 | 12 |\n| ≤ 100k | 12 | 24 |\n\nDefault values.yaml ships the ≤ 5k tier; operators override per workload.\n\n**prometheus-adapter config**: add a ConfigMap-defined `rules.externalMetrics` entry mapping `miroir_background_queue_depth` to the external metrics API. This is NOT shipped by the Miroir chart (operators install prometheus-adapter separately); the chart's `NOTES.txt` calls it out.\n\n**Stabilization windows**: scale-up fast (30s), scale-down slow (300s). Avoids pod flapping.\n\n## Acceptance\n\n- [ ] `helm lint --strict` with `hpa.enabled: true + replicas: 1` → fails with schema error\n- [ ] `helm lint --strict` with `hpa.enabled: true + replicas: 2 + backend: sqlite` → fails\n- [ ] HPA in a kind cluster: induce CPU load → scales up within 30s; load drops → scales down after 300s\n- [ ] External metric binding: `miroir_background_queue_depth` visible via `kubectl get --raw /apis/external.metrics.k8s.io/v1beta1/...`","design":"","acceptance_criteria":"","notes":"","status":"open","priority":0,"issue_type":"task","created_at":"2026-04-18T21:40:30.676597441Z","created_by":"coding","updated_at":"2026-04-18T21:40:36.163090876Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-6"],"dependencies":[{"issue_id":"miroir-m9q.6","depends_on_id":"miroir-m9q.4","type":"blocks","created_at":"2026-04-18T21:40:36.140248526Z","created_by":"coding","metadata":"{}","thread_id":""},{"issue_id":"miroir-m9q.6","depends_on_id":"miroir-m9q.5","type":"blocks","created_at":"2026-04-18T21:40:36.163063693Z","created_by":"coding","metadata":"{}","thread_id":""}]}
{"id":"miroir-m9q.7","title":"P6.7 Resource-pressure metrics + alerts (§14.9)","description":"## What\n\nRegister the plan §14.9 resource-pressure metrics:\n- `miroir_memory_pressure` gauge (0=ok, 1=warn >75%, 2=critical >90%)\n- `miroir_cpu_throttled_seconds_total` counter (cgroup throttling)\n- `miroir_request_queue_depth` gauge\n- `miroir_background_queue_depth{job_type}` gauge\n- `miroir_peer_pod_count` gauge\n- `miroir_leader` gauge\n- `miroir_owned_shards_count` gauge\n\nAnd the associated `PrometheusRule` alerts (plan §14.9).\n\n## Why\n\nThese surface under-scaling BEFORE user-visible impact. `miroir_memory_pressure` + `MiroirMemoryPressure` alert give operators (and HPA) a leading indicator instead of waiting for OOM-kill.\n\n## Details\n\n**cgroup reads**: on Linux, read `/sys/fs/cgroup/cpu.stat` (cgroup v2) or `/sys/fs/cgroup/cpu/cpu.stat` (v1) for `nr_throttled`/`throttled_time`. Convert throttled_time nanoseconds → seconds for the counter.\n\n**Memory pressure gauge**: read `/sys/fs/cgroup/memory.current` + `memory.max`; compute utilization; map to 0/1/2 per threshold.\n\n**PrometheusRule**:\n```yaml\n- alert: MiroirMemoryPressure\n expr: miroir_memory_pressure >= 2\n for: 5m\n- alert: MiroirRequestQueueBacklog\n expr: miroir_request_queue_depth > 500\n for: 2m\n- alert: MiroirBackgroundJobBacklog\n expr: miroir_background_queue_depth > 100\n for: 10m\n- alert: MiroirPeerDiscoveryGap\n expr: miroir_peer_pod_count < kube_deployment_status_replicas_ready{deployment=\"miroir\"}\n for: 2m\n- alert: MiroirNoLeader\n expr: sum(miroir_leader) == 0\n for: 1m\n```\n\n## Acceptance\n\n- [ ] All 7 metrics present on `:9090/metrics`\n- [ ] `miroir_memory_pressure` reports 2 when artificial allocation pushes RSS > 90% of limit\n- [ ] `MiroirNoLeader` fires after killing the leader without replacement within 1 min\n- [ ] `MiroirPeerDiscoveryGap` fires if headless Service misconfigured","design":"","acceptance_criteria":"","notes":"","status":"open","priority":1,"issue_type":"task","created_at":"2026-04-18T21:40:30.711963985Z","created_by":"coding","updated_at":"2026-04-18T21:40:30.711963985Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-6"]}
{"id":"miroir-mkk","title":"Phase 4 — Topology Operations (rebalance, add/remove node + group, drain)","description":"## Phase 4 Epic — Topology Operations\n\nMakes the cluster *elastic*: operators can add or remove nodes within a group (capacity scaling) or add/remove entire replica groups (throughput scaling) without a full reindex and without downtime.\n\n## Why This Matters\n\nPlan §2 \"Topology changes\" and §4 \"Rebalancer\" together are **the** operational differentiator. Without this phase, Miroir is a static sharder — useful but not production-grade. Elasticity is what justifies the complexity of the whole system.\n\nPlan §15 Open Problem 1 (dual-write race) is partially mitigated by careful sequencing here and fully closed by §13.8 anti-entropy in Phase 5. Getting the sequencing right here means Phase 5's reconciler is a safety net, not the primary correctness mechanism.\n\n## Scope\n\n**Node addition (within a group; plan §2 \"Adding a node\")**\n\n1. Assign new node to a group; mark `joining`\n2. Recompute assignments — ~S/(Ng+1) shards move\n3. Dual-write: new inbound writes for affected shards go to **both** old owner and new node\n4. Background migration per shard: `GET /indexes/{uid}/documents?filter=_miroir_shard={id}&limit=1000&offset=...` → write each page to new node\n5. Mark `active`; stop dual-write; `POST /indexes/{uid}/documents/delete` with `filter=_miroir_shard={id}` on old owner\n\n**Replica-group addition (plan §2 \"Adding a new replica group\")** — mark `initializing`, background-sync from any healthy group using the same `_miroir_shard` filter, then flip to `active` and start routing queries.\n\n**Node removal (plan §2 \"Removing a node\")** — mark `draining`, recompute, migrate ~RF/Ng fraction to survivors, mark `removed`, operator deletes PVC.\n\n**Group removal (plan §2 \"Removing a replica group\")** — mark `draining`, stop routing queries; no data migration (other groups hold the docs); decommission.\n\n**Unplanned node failure (plan §2 \"Node failure\")** — mark `failed`; surviving intra-group replicas cover if RF>1; cross-group fallback if RF=1; schedule background replication to restore RF.\n\n**Admin API** (plan §4 admin table) — `POST /_miroir/nodes`, `DELETE /_miroir/nodes/{id}`, `POST /_miroir/nodes/{id}/drain`, `POST /_miroir/rebalance`, `GET /_miroir/rebalance/status`.\n\n## Design Notes\n\n- Relies on `_miroir_shard` being `filterable` on every node — set by Phase 2 index-create broadcast\n- Only one rebalance at a time per index (advisory lock → Phase 6 Mode B leader lease)\n- Chunked migration bounded by `rebalancer.max_concurrent_migrations` (default 4) to stay under the per-pod 3.75 GB envelope\n- Migration progress reported via `GET /_miroir/rebalance/status` and `miroir_rebalance_*` metrics (§10)\n- No full-corpus scans ever — the `_miroir_shard` filter is the key primitive; any code path that enumerates \"all docs\" is a bug\n\n## Open Problem Closure\n\nPlan §15 #1 — dual-write cutover race: document the exact sequencing here and note that §13.8 anti-entropy is the guaranteed safety net on the next pass.\n\n## Definition of Done\n\n- [ ] Chaos test: add a node mid-indexing — every doc remains readable; no duplicates on a subsequent search\n- [ ] Chaos test: drain a node while queries are in flight — zero client-visible failures; `X-Miroir-Degraded` absent or transient only\n- [ ] Chaos test: add a replica group while queries are in flight — existing groups unaffected; new group starts serving reads only after sync completes\n- [ ] Rebalance of a 3→4 node cluster moves ≤ 2×(1/4) of docs (optimal per plan §8 benches)\n- [ ] Restart a killed node mid-rebalance — rebalance pauses + resumes; no data loss","design":"","acceptance_criteria":"","notes":"","status":"open","priority":0,"issue_type":"epic","assignee":"","created_at":"2026-04-18T21:19:53.993012197Z","created_by":"coding","updated_at":"2026-05-09T16:11:31.984602638Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase","phase-4"],"dependencies":[{"issue_id":"miroir-mkk","depends_on_id":"miroir-9dj","type":"blocks","created_at":"2026-04-18T21:23:08.595905334Z","created_by":"coding","metadata":"{}","thread_id":""},{"issue_id":"miroir-mkk","depends_on_id":"miroir-r3j","type":"blocks","created_at":"2026-04-18T21:23:08.609300009Z","created_by":"coding","metadata":"{}","thread_id":""}]}
{"id":"miroir-mkk.1","title":"P4.1 Rebalancer background worker + advisory lock","description":"## What\n\nImplement the rebalancer as a background Tokio task (plan §4 \"Rebalancer\"):\n- Advisory lock — only one Miroir instance runs the rebalancer at a time (Phase 6 §14.5 Mode B replaces with leader lease)\n- Reacts to topology change events (node add/drain/fail/recover) from the admin API + health checker\n- Computes affected shards (the `~S/(Ng+1)` or `~RF/Ng` delta) using the Phase 1 router\n- Drives the migration state machine for each affected shard\n- Updates `miroir_rebalance_in_progress`, `miroir_rebalance_documents_migrated_total`, `miroir_rebalance_duration_seconds` (plan §10)\n\n## Why\n\nThe rebalancer is the orchestrator of all Phase 4 operations. Everything else in this phase is a subroutine called by this worker. Keeping it as a dedicated task — rather than inline in admin handlers — means a slow migration doesn't block admin API responses and a crash restarts cleanly from the task-store state.\n\n## Details\n\n**State machine per-shard**:\n```\nIdle → DualWriteStarted → MigrationInProgress → MigrationComplete → DualWriteStopped → OldReplicaDeleted → Idle\n```\n\n**Concurrency bound**: `rebalancer.max_concurrent_migrations` (default 4) to stay within plan §14.2 memory budget for migration buffers.\n\n**Progress persistence**: per-shard cursor in `jobs` table (Phase 3) so a pod restart resumes at the last committed offset. Idempotent per primary key (same doc re-written on resume is no-op at Meilisearch level).\n\n**Cancellation**: an admin API call can pause (not delete) an in-progress rebalance; resuming picks up at the persisted cursor.\n\n## Acceptance\n\n- [ ] Advisory lock: two pods running the rebalancer simultaneously produce 0 duplicate migrations (enforced via the `leader_lease` row for scope `rebalance:<index>`)\n- [ ] Progress persistence: kill the pod mid-migration; another takes over within lease TTL and completes without starting over\n- [ ] Metrics tick: `miroir_rebalance_documents_migrated_total` monotonically increases; `_duration_seconds` histogram records per-shard migration time","design":"","acceptance_criteria":"","notes":"","status":"in_progress","priority":0,"issue_type":"task","assignee":"claude-code-glm-4.7-bravo","created_at":"2026-04-18T21:31:43.768256172Z","created_by":"coding","updated_at":"2026-05-23T08:35:07.260464820Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-4"]}
{"id":"miroir-mkk.1","title":"P4.1 Rebalancer background worker + advisory lock","description":"## What\n\nImplement the rebalancer as a background Tokio task (plan §4 \"Rebalancer\"):\n- Advisory lock — only one Miroir instance runs the rebalancer at a time (Phase 6 §14.5 Mode B replaces with leader lease)\n- Reacts to topology change events (node add/drain/fail/recover) from the admin API + health checker\n- Computes affected shards (the `~S/(Ng+1)` or `~RF/Ng` delta) using the Phase 1 router\n- Drives the migration state machine for each affected shard\n- Updates `miroir_rebalance_in_progress`, `miroir_rebalance_documents_migrated_total`, `miroir_rebalance_duration_seconds` (plan §10)\n\n## Why\n\nThe rebalancer is the orchestrator of all Phase 4 operations. Everything else in this phase is a subroutine called by this worker. Keeping it as a dedicated task — rather than inline in admin handlers — means a slow migration doesn't block admin API responses and a crash restarts cleanly from the task-store state.\n\n## Details\n\n**State machine per-shard**:\n```\nIdle → DualWriteStarted → MigrationInProgress → MigrationComplete → DualWriteStopped → OldReplicaDeleted → Idle\n```\n\n**Concurrency bound**: `rebalancer.max_concurrent_migrations` (default 4) to stay within plan §14.2 memory budget for migration buffers.\n\n**Progress persistence**: per-shard cursor in `jobs` table (Phase 3) so a pod restart resumes at the last committed offset. Idempotent per primary key (same doc re-written on resume is no-op at Meilisearch level).\n\n**Cancellation**: an admin API call can pause (not delete) an in-progress rebalance; resuming picks up at the persisted cursor.\n\n## Acceptance\n\n- [ ] Advisory lock: two pods running the rebalancer simultaneously produce 0 duplicate migrations (enforced via the `leader_lease` row for scope `rebalance:<index>`)\n- [ ] Progress persistence: kill the pod mid-migration; another takes over within lease TTL and completes without starting over\n- [ ] Metrics tick: `miroir_rebalance_documents_migrated_total` monotonically increases; `_duration_seconds` histogram records per-shard migration time","design":"","acceptance_criteria":"","notes":"","status":"in_progress","priority":0,"issue_type":"task","assignee":"claude-code-glm-4.7-bravo","created_at":"2026-04-18T21:31:43.768256172Z","created_by":"coding","updated_at":"2026-05-23T10:53:40.402272474Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-4"]}
{"id":"miroir-mkk.2","title":"P4.2 Node addition: dual-write + paginated shard migration","description":"## What\n\nImplement the node-addition flow from plan §2 \"Adding a node to an existing group\":\n1. Admin API: `POST /_miroir/nodes` body `{\"id\": \"meili-N\", \"address\": \"...\", \"replica_group\": G}`\n2. Mark `joining`\n3. Recompute assignments — `affected_shards` where `meili-N` enters the top-RF within group G\n4. **Dual-write**: new inbound writes for affected shards go to **both** old owner and new node (idempotent — Meilisearch PUT semantics handle dupes via primary key)\n5. For each affected shard, background migration via the shard-filter primitive (plan §4):\n ```\n GET /indexes/{uid}/documents?filter=_miroir_shard={shard_id}&limit=1000&offset=0\n GET /indexes/{uid}/documents?filter=_miroir_shard={shard_id}&limit=1000&offset=1000\n ... until exhausted\n ```\n6. Write each page to the new node (docs already carry `_miroir_shard`)\n7. Mark `active`; stop dual-write\n8. Delete migrated shard from old node: `POST /indexes/{uid}/documents/delete {\"filter\": \"_miroir_shard = {shard_id}\"}`\n9. Documents on unaffected shards never touched\n\n## Why\n\nPlan §1 principle 4 (RF-configurable redundancy) + §2 \"Three independent scaling dimensions\" depend on this. The `_miroir_shard` filter primitive is what makes migration move only `~total_docs/(N+1)` docs instead of `total_docs` — a 10100× reduction in I/O vs. a naive \"copy everything then diff\" approach.\n\n## Details\n\n**Dual-write durability invariant**: between steps 4 and 7, every accepted write for the affected shards lands on both old and new. If dual-write is skipped while migration is running, writes arriving at that exact moment may land only on the old owner and be lost when step 8 deletes. Plan §15 Open Problem 1 is the remaining race; §13.8 anti-entropy (Phase 5) is the safety net.\n\n**Pagination cursor**: `offset` is the simplest, but Meilisearch `limit + offset` has an internal cap (default 1000 + 0 → max ~20 for safe). Configure `pagination.maxTotalHits` per-node at index creation to allow deep pagination (safe: we're just iterating our own injected shard).\n\n**Per-page batch**: `rebalancer.migration_batch_size` (default 1000) — one page read + one page write per cycle.\n\n**Fail-open behavior**: if the source node becomes unavailable mid-migration, the rebalancer pauses this shard; other shards continue. When source comes back, resume.\n\n## Acceptance\n\n- [ ] Integration test: 3-node → 4-node migration, 10K docs, each doc still retrievable by ID after migration\n- [ ] Chaos: toggle writes on/off during migration; dual-write window catches all late writes\n- [ ] Performance: migrating `~S/(Ng+1)` shards moves ≤ `total_docs / (Ng+1) × 1.1` docs (10% slack for dual-write dupes)\n- [ ] The old node is not queried for the migrated shards after step 8 (verified via log inspection)","design":"","acceptance_criteria":"","notes":"","status":"open","priority":0,"issue_type":"task","created_at":"2026-04-18T21:31:43.790167851Z","created_by":"coding","updated_at":"2026-04-18T21:31:48.930644191Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-4"],"dependencies":[{"issue_id":"miroir-mkk.2","depends_on_id":"miroir-mkk.1","type":"blocks","created_at":"2026-04-18T21:31:48.930624028Z","created_by":"coding","metadata":"{}","thread_id":""}]}
{"id":"miroir-mkk.3","title":"P4.3 Node removal (drain): migrate off + delete PVC handoff","description":"## What\n\nImplement `POST /_miroir/nodes/{id}/drain` + `DELETE /_miroir/nodes/{id}` (plan §2 \"Removing a node\"):\n1. Mark `draining`; stop routing writes for its affected shards to it\n2. Recompute assignments — affected shards reassigned to surviving nodes in the same group\n3. Background migration: copy affected shards to new owners via the `_miroir_shard` filter primitive\n4. Mark `removed`\n5. `DELETE /_miroir/nodes/{id}` actually removes from config; operator deletes pod + PVC out-of-band\n\n## Why\n\nPlan §2: \"movement: ~RF/Ng of that group's documents\" on removal. The drain API decouples \"stop taking writes\" (immediate) from \"delete the pod\" (operator decision) — gives operators room to verify before committing to hardware loss.\n\n## Details\n\n**Order matters**: drain → remove. `drain` is reversible (mark `active` again); `remove` is not. CLI (`miroir-ctl node drain meili-2` per plan §11) should pause and await confirmation before the remove step.\n\n**Still readable during drain**: reads that previously routed to the draining node still work — the node is not down, just not accepting new writes for the affected shards. Read traffic naturally drifts to the replacement replica via Phase 1 `covering_set` intra-group rotation.\n\n**Safety check**: refuse drain if it would drop a shard below RF=1 in its group AND the group has no healthy peer group to fall back to. Require `--force` to override.\n\n**Post-drain verification**: query `GET /indexes/{uid}/documents?filter=_miroir_shard={s}&limit=1` against the drained node — should return 0 results for every shard before `remove` is permitted.\n\n## Acceptance\n\n- [ ] 3-node RF=2 group: drain node-1; searches still succeed with zero degraded responses\n- [ ] After drain completes, `GET /indexes/{uid}/documents?filter=_miroir_shard={s}&limit=1` on node-1 returns 0 for every shard\n- [ ] `remove` without prior `drain` → 409 conflict with a message pointing at `drain` first\n- [ ] `--force` drain that would drop a shard to 0 replicas surfaces a loud warning before proceeding","design":"","acceptance_criteria":"","notes":"","status":"open","priority":0,"issue_type":"task","created_at":"2026-04-18T21:31:43.815997915Z","created_by":"coding","updated_at":"2026-04-18T21:31:48.943083697Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-4"],"dependencies":[{"issue_id":"miroir-mkk.3","depends_on_id":"miroir-mkk.1","type":"blocks","created_at":"2026-04-18T21:31:48.943066166Z","created_by":"coding","metadata":"{}","thread_id":""}]}
{"id":"miroir-mkk.4","title":"P4.4 Replica group addition: initializing → active","description":"## What\n\nImplement the \"Adding a new replica group\" flow from plan §2:\n1. Provision new nodes; assign `replica_group: G_new` in config\n2. Mark new group `initializing`; queries NOT routed here\n3. Background sync: for each shard, copy all docs from **any** healthy existing group to the new group's nodes via `filter=_miroir_shard={id}` pagination; new inbound writes already fan out to the new group immediately\n4. When all shards synced, mark group `active` — queries begin routing in round-robin\n5. Existing groups continue serving queries throughout (zero read interruption)\n\n## Why\n\nPlan §2 \"Adding a new replica group (throughput scaling)\": adding a group multiplies query capacity without touching existing groups' data. This is the primary \"we need more search QPS\" lever. Unlike intra-group rebalance which moves a subset, group-add **copies** every shard to the new group — so the I/O is proportional to total corpus size, not `1/(Ng+1)`.\n\n## Details\n\n**Source group selection**: round-robin across existing `active` groups to spread read load during sync. Per-shard picks a different source so one group isn't hammered.\n\n**Write fan-out during sync**: new group already receives writes from step 3 onward. This is the durability guarantee — only the backfill window of historical data is transient.\n\n**Progress tracking**: per-shard cursor in `jobs` table; can be paused/resumed per Phase 6 Mode C.\n\n**Verification before `active`**: `GET /indexes/{uid}/stats` against new group → docs count within 0.1% of source group (allows for writes landing during sync). If higher variance, delay the flip and investigate.\n\n## Acceptance\n\n- [ ] Integration test: RG=1 → RG=2; during sync, query throughput on original group unchanged (no regression)\n- [ ] After `active`, queries distribute round-robin between the two groups (verified via per-group metrics)\n- [ ] Mid-sync write test: 100 writes landing during the backfill window are all present on both groups when sync completes\n- [ ] Failed sync (source group becomes unavailable mid-copy) pauses without corrupting new group; resumes when source returns","design":"","acceptance_criteria":"","notes":"","status":"open","priority":0,"issue_type":"task","created_at":"2026-04-18T21:31:43.859158013Z","created_by":"coding","updated_at":"2026-04-18T21:31:48.961616587Z","source_repo":".","compaction_level":0,"original_size":0,"labels":["phase-4"],"dependencies":[{"issue_id":"miroir-mkk.4","depends_on_id":"miroir-mkk.1","type":"blocks","created_at":"2026-04-18T21:31:48.961576914Z","created_by":"coding","metadata":"{}","thread_id":""}]}

View file

@ -3,13 +3,13 @@
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 1,
"outcome": "failure",
"duration_ms": 407520,
"exit_code": 0,
"outcome": "success",
"duration_ms": 383817,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-05-23T06:29:31.150448639Z",
"captured_at": "2026-05-23T10:44:33.548974174Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null

File diff suppressed because one or more lines are too long

View file

@ -3,13 +3,13 @@
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 1,
"outcome": "failure",
"duration_ms": 315458,
"exit_code": 124,
"outcome": "timeout",
"duration_ms": 600001,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-05-23T08:35:58.256900483Z",
"captured_at": "2026-05-23T11:01:23.370535705Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null

View file

@ -1,2 +0,0 @@
SessionEnd hook [/home/coding/.ccdash/hooks/session-end.sh] failed: /bin/sh: line 1: /home/coding/.ccdash/hooks/session-end.sh: cannot execute: required file not found

File diff suppressed because one or more lines are too long

View file

@ -9,7 +9,7 @@
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-05-23T08:25:07.054276895Z",
"captured_at": "2026-05-23T11:03:40.564308197Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null

File diff suppressed because one or more lines are too long

View file

@ -1 +1 @@
2b5e80e93a6b2bb1633afad18a571cb561aa8b09
d2b79c4d54110207292fa9de87e53799d5251591

View file

@ -0,0 +1,826 @@
//! Acceptance tests for Mode C work-queued chunked jobs (P6.5).
//!
//! These tests verify the key acceptance criteria from plan §14.5:
//! 1. Large dumps split into chunks; multiple pods claim chunks in parallel
//! 2. Claim expiration: pod crash → claim expires → another pod resumes at last_cursor
//! 3. HPA queue depth metric drives autoscaling
//! 4. Concurrent dumps interleave without starvation
use super::*;
use crate::error::Result;
use crate::mode_c_coordinator::{JobChunk, JobParams, JobProgress, JobType, ModeCCoordinator};
use crate::task_store::{JobRow, NewJob, TaskStore};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::time::sleep;
/// Create a test coordinator with in-memory task store.
fn test_coordinator(pod_id: &str) -> ModeCCoordinator {
let store = Arc::new(MockTaskStore::new());
ModeCCoordinator::new(store, pod_id.to_string())
}
/// Create a test coordinator with a shared task store (for multi-pod tests).
fn test_coordinator_with_store(pod_id: &str, store: Arc<MockTaskStore>) -> ModeCCoordinator {
ModeCCoordinator::new(store, pod_id.to_string())
}
/// Mock task store for testing.
struct MockTaskStore {
jobs: Arc<std::sync::Mutex<Vec<JobRow>>>,
}
impl MockTaskStore {
fn new() -> Self {
Self {
jobs: Arc::new(std::sync::Mutex::new(Vec::new())),
}
}
}
impl TaskStore for MockTaskStore {
fn migrate(&self) -> Result<()> {
Ok(())
}
fn insert_job(&self, job: &NewJob) -> Result<()> {
let mut jobs = self.jobs.lock().unwrap();
jobs.push(JobRow {
id: job.id.clone(),
type_: job.type_.clone(),
params: job.params.clone(),
state: job.state.clone(),
claimed_by: None,
claim_expires_at: None,
progress: job.progress.clone(),
parent_job_id: job.parent_job_id.clone(),
chunk_index: job.chunk_index,
total_chunks: job.total_chunks,
created_at: Some(job.created_at),
});
Ok(())
}
fn get_job(&self, id: &str) -> Result<Option<JobRow>> {
let jobs = self.jobs.lock().unwrap();
Ok(jobs.iter().find(|j| j.id == id).cloned())
}
fn claim_job(&self, id: &str, claimed_by: &str, claim_expires_at: i64) -> Result<bool> {
let mut jobs = self.jobs.lock().unwrap();
if let Some(job) = jobs.iter_mut().find(|j| j.id == id && j.state == "queued") {
job.state = "in_progress".to_string();
job.claimed_by = Some(claimed_by.to_string());
job.claim_expires_at = Some(claim_expires_at);
Ok(true)
} else {
Ok(false)
}
}
fn update_job_progress(&self, id: &str, state: &str, progress: &str) -> Result<bool> {
let mut jobs = self.jobs.lock().unwrap();
if let Some(job) = jobs.iter_mut().find(|j| j.id == id) {
job.state = state.to_string();
job.progress = progress.to_string();
Ok(true)
} else {
Ok(false)
}
}
fn renew_job_claim(&self, id: &str, claim_expires_at: i64) -> Result<bool> {
let mut jobs = self.jobs.lock().unwrap();
if let Some(job) = jobs.iter_mut().find(|j| j.id == id && j.state == "in_progress") {
job.claim_expires_at = Some(claim_expires_at);
Ok(true)
} else {
Ok(false)
}
}
fn list_jobs_by_state(&self, state: &str) -> Result<Vec<JobRow>> {
let jobs = self.jobs.lock().unwrap();
Ok(jobs.iter().filter(|j| j.state == state).cloned().collect())
}
fn count_jobs_by_state(&self, state: &str) -> Result<u64> {
let jobs = self.jobs.lock().unwrap();
Ok(jobs.iter().filter(|j| j.state == state).count() as u64)
}
fn list_expired_claims(&self, now_ms: i64) -> Result<Vec<JobRow>> {
let jobs = self.jobs.lock().unwrap();
Ok(jobs
.iter()
.filter(|j| {
j.state == "in_progress"
&& j
.claim_expires_at
.map_or(false, |exp| exp < now_ms)
})
.cloned()
.collect())
}
fn list_jobs_by_parent(&self, parent_job_id: &str) -> Result<Vec<JobRow>> {
let jobs = self.jobs.lock().unwrap();
Ok(jobs
.iter()
.filter(|j| j.parent_job_id.as_deref() == Some(parent_job_id))
.cloned()
.collect())
}
fn reclaim_job_claim(&self, id: &str, state: &str, progress: &str) -> Result<bool> {
let mut jobs = self.jobs.lock().unwrap();
if let Some(job) = jobs.iter_mut().find(|j| j.id == id) {
job.state = state.to_string();
job.progress = progress.to_string();
job.claimed_by = None;
job.claim_expires_at = None;
Ok(true)
} else {
Ok(false)
}
}
// Stub implementations for unused trait methods
fn insert_task(&self, _task: &crate::task_store::NewTask) -> Result<()> {
Ok(())
}
fn get_task(&self, _miroir_id: &str) -> Result<Option<crate::task_store::TaskRow>> {
Ok(None)
}
fn update_task_status(&self, _miroir_id: &str, _status: &str) -> Result<bool> {
Ok(false)
}
fn update_node_task(&self, _miroir_id: &str, _node_id: &str, _task_uid: u64) -> Result<bool> {
Ok(false)
}
fn set_task_error(&self, _miroir_id: &str, _error: &str) -> Result<bool> {
Ok(false)
}
fn list_tasks(&self, _filter: &crate::task_store::TaskFilter) -> Result<Vec<crate::task_store::TaskRow>> {
Ok(Vec::new())
}
fn prune_tasks(&self, _cutoff_ms: i64, _batch_size: u32) -> Result<usize> {
Ok(0)
}
fn task_count(&self) -> Result<u64> {
Ok(0)
}
fn upsert_node_settings_version(
&self,
_index_uid: &str,
_node_id: &str,
_version: i64,
_updated_at: i64,
) -> Result<()> {
Ok(())
}
fn get_node_settings_version(
&self,
_index_uid: &str,
_node_id: &str,
) -> Result<Option<crate::task_store::NodeSettingsVersionRow>> {
Ok(None)
}
fn create_alias(&self, _alias: &crate::task_store::NewAlias) -> Result<()> {
Ok(())
}
fn get_alias(&self, _name: &str) -> Result<Option<crate::task_store::AliasRow>> {
Ok(None)
}
fn flip_alias(&self, _name: &str, _new_uid: &str, _history_retention: usize) -> Result<bool> {
Ok(false)
}
fn delete_alias(&self, _name: &str) -> Result<bool> {
Ok(false)
}
fn list_aliases(&self) -> Result<Vec<crate::task_store::AliasRow>> {
Ok(Vec::new())
}
fn upsert_session(&self, _session: &crate::task_store::SessionRow) -> Result<()> {
Ok(())
}
fn get_session(&self, _session_id: &str) -> Result<Option<crate::task_store::SessionRow>> {
Ok(None)
}
fn delete_expired_sessions(&self, _now_ms: i64) -> Result<usize> {
Ok(0)
}
fn insert_idempotency_entry(&self, _entry: &crate::task_store::IdempotencyEntry) -> Result<()> {
Ok(())
}
fn get_idempotency_entry(&self, _key: &str) -> Result<Option<crate::task_store::IdempotencyEntry>> {
Ok(None)
}
fn delete_expired_idempotency_entries(&self, _now_ms: i64) -> Result<usize> {
Ok(0)
}
fn try_acquire_leader_lease(
&self,
_scope: &str,
_holder: &str,
_expires_at: i64,
_now_ms: i64,
) -> Result<bool> {
Ok(true)
}
fn renew_leader_lease(&self, _scope: &str, _holder: &str, _expires_at: i64) -> Result<bool> {
Ok(true)
}
fn get_leader_lease(&self, _scope: &str) -> Result<Option<crate::task_store::LeaderLeaseRow>> {
Ok(None)
}
fn upsert_canary(&self, _canary: &crate::task_store::NewCanary) -> Result<()> {
Ok(())
}
fn get_canary(&self, _id: &str) -> Result<Option<crate::task_store::CanaryRow>> {
Ok(None)
}
fn list_canaries(&self) -> Result<Vec<crate::task_store::CanaryRow>> {
Ok(Vec::new())
}
fn delete_canary(&self, _id: &str) -> Result<bool> {
Ok(false)
}
fn insert_canary_run(&self, _run: &crate::task_store::NewCanaryRun, _run_history_limit: usize) -> Result<()> {
Ok(())
}
fn get_canary_runs(&self, _canary_id: &str, _limit: usize) -> Result<Vec<crate::task_store::CanaryRunRow>> {
Ok(Vec::new())
}
fn upsert_cdc_cursor(&self, _cursor: &crate::task_store::NewCdcCursor) -> Result<()> {
Ok(())
}
fn get_cdc_cursor(&self, _sink_name: &str, _index_uid: &str) -> Result<Option<crate::task_store::CdcCursorRow>> {
Ok(None)
}
fn list_cdc_cursors(&self, _sink_name: &str) -> Result<Vec<crate::task_store::CdcCursorRow>> {
Ok(Vec::new())
}
fn insert_tenant_mapping(&self, _mapping: &crate::task_store::NewTenantMapping) -> Result<()> {
Ok(())
}
fn get_tenant_mapping(&self, _api_key_hash: &[u8]) -> Result<Option<crate::task_store::TenantMapRow>> {
Ok(None)
}
fn delete_tenant_mapping(&self, _api_key_hash: &[u8]) -> Result<bool> {
Ok(false)
}
fn upsert_rollover_policy(&self, _policy: &crate::task_store::NewRolloverPolicy) -> Result<()> {
Ok(())
}
fn get_rollover_policy(&self, _name: &str) -> Result<Option<crate::task_store::RolloverPolicyRow>> {
Ok(None)
}
fn list_rollover_policies(&self) -> Result<Vec<crate::task_store::RolloverPolicyRow>> {
Ok(Vec::new())
}
fn delete_rollover_policy(&self, _name: &str) -> Result<bool> {
Ok(false)
}
fn upsert_search_ui_config(&self, _config: &crate::task_store::NewSearchUiConfig) -> Result<()> {
Ok(())
}
fn get_search_ui_config(&self, _index_uid: &str) -> Result<Option<crate::task_store::SearchUiConfigRow>> {
Ok(None)
}
fn delete_search_ui_config(&self, _index_uid: &str) -> Result<bool> {
Ok(false)
}
fn insert_admin_session(&self, _session: &crate::task_store::NewAdminSession) -> Result<()> {
Ok(())
}
fn get_admin_session(&self, _session_id: &str) -> Result<Option<crate::task_store::AdminSessionRow>> {
Ok(None)
}
fn revoke_admin_session(&self, _session_id: &str) -> Result<bool> {
Ok(false)
}
fn delete_expired_admin_sessions(&self, _now_ms: i64) -> Result<usize> {
Ok(0)
}
fn upsert_mode_b_operation(&self, _operation: &crate::task_store::ModeBOperation) -> Result<()> {
Ok(())
}
fn get_mode_b_operation(&self, _operation_id: &str) -> Result<Option<crate::task_store::ModeBOperation>> {
Ok(None)
}
fn get_mode_b_operation_by_scope(&self, _scope: &str) -> Result<Option<crate::task_store::ModeBOperation>> {
Ok(None)
}
fn list_mode_b_operations(&self, _filter: &crate::task_store::ModeBOperationFilter) -> Result<Vec<crate::task_store::ModeBOperation>> {
Ok(Vec::new())
}
fn delete_mode_b_operation(&self, _operation_id: &str) -> Result<bool> {
Ok(false)
}
fn prune_mode_b_operations(&self, _cutoff_ms: i64, _batch_size: u32) -> Result<usize> {
Ok(0)
}
}
/// P6.5-A1: 1 GB dump splits into 4× 256 MiB chunks; 3 pods claim 3 of 4 chunks in parallel.
#[tokio::test]
async fn p6_5_a1_one_gb_dump_splits_into_chunks_processed_in_parallel() {
let store = Arc::new(MockTaskStore::new());
let pod1_coordinator = test_coordinator_with_store("pod-1", store.clone());
let pod2_coordinator = test_coordinator_with_store("pod-2", store.clone());
let pod3_coordinator = test_coordinator_with_store("pod-3", store);
// Enqueue a 1 GB dump import job
let params = JobParams {
index_uid: "test-index".to_string(),
primary_key: Some("id".to_string()),
shard_count: Some(64),
old_shards: None,
target_shards: None,
shadow_index: None,
chunk: None,
source_url: Some("https://example.com/dump.ndjson".to_string()),
source_size_bytes: Some(1_000_000_000), // 1 GB
};
let parent_job_id = pod1_coordinator
.enqueue_job(JobType::DumpImport, params.clone())
.unwrap();
// Pod 1 claims the parent job
let claimed = pod1_coordinator.claim_job().unwrap().unwrap();
assert_eq!(claimed.id, parent_job_id);
assert_eq!(claimed.claimed_by, "pod-1");
// Pod 1 splits the job into 4 chunks (4 × 256 MiB)
let chunk_size_bytes = 268_435_456; // 256 MiB
let total_chunks = ((1_000_000_000 + chunk_size_bytes - 1) / chunk_size_bytes) as u32;
assert_eq!(total_chunks, 4);
let chunks: Vec<JobChunk> = (0..total_chunks)
.map(|i| {
let i = i as u64;
let start = i * chunk_size_bytes;
let end = std::cmp::min(start + chunk_size_bytes, 1_000_000_000);
JobChunk {
index: i as u32,
total: total_chunks,
start: start.to_string(),
end: end.to_string(),
size_bytes: end - start,
}
})
.collect();
pod1_coordinator
.split_job_into_chunks(&claimed, chunks)
.unwrap();
// Verify 4 chunk jobs were created
let child_jobs = pod1_coordinator.list_chunks(&parent_job_id).unwrap();
assert_eq!(child_jobs.len(), 4);
// Each chunk should be in queued state
for child in &child_jobs {
assert_eq!(child.state, "queued");
assert_eq!(child.parent_job_id, Some(parent_job_id.clone()));
}
// Simulate 3 pods claiming 3 of the 4 chunks in parallel
let pod2_claimed = pod2_coordinator.claim_job().unwrap().unwrap();
let pod3_claimed = pod3_coordinator.claim_job().unwrap().unwrap();
let pod1_claimed_chunk = pod1_coordinator.claim_job().unwrap().unwrap();
// Verify each pod claimed a different chunk
let claimed_ids = vec![
pod2_claimed.id.clone(),
pod3_claimed.id.clone(),
pod1_claimed_chunk.id.clone(),
];
assert_eq!(claimed_ids.len(), 3); // 3 distinct claims
// Verify all claimed chunks are now in_progress
for job_id in &claimed_ids {
let job = pod1_coordinator.get_job(job_id).unwrap().unwrap();
assert_eq!(job.state, "in_progress");
}
// One chunk remains queued (unclaimed)
let queued_jobs = pod1_coordinator.list_jobs_by_state("queued").unwrap();
assert_eq!(queued_jobs.len(), 1);
// Queue depth should be 1 (the remaining chunk)
let queue_depth = pod1_coordinator.queue_depth().unwrap();
assert_eq!(queue_depth, 1);
}
/// P6.5-A2: Kill a claimant mid-chunk; claim expires in 30s; another pod picks up and resumes at last_cursor.
#[tokio::test]
async fn p6_5_a2_claim_expiration_allows_resume_at_last_cursor() {
let store = Arc::new(MockTaskStore::new());
let pod1_coordinator = test_coordinator_with_store("pod-1", store.clone());
let pod2_coordinator = test_coordinator_with_store("pod-2", store);
// Enqueue and claim a job
let params = JobParams {
index_uid: "test-index".to_string(),
primary_key: Some("id".to_string()),
shard_count: Some(64),
old_shards: None,
target_shards: None,
shadow_index: None,
chunk: None,
source_url: Some("https://example.com/dump.ndjson".to_string()),
source_size_bytes: Some(1_000_000_000),
};
let job_id = pod1_coordinator
.enqueue_job(JobType::DumpImport, params.clone())
.unwrap();
let claimed = pod1_coordinator.claim_job().unwrap().unwrap();
assert_eq!(claimed.claimed_by, "pod-1");
// Simulate processing progress (50% complete)
let progress = JobProgress {
bytes_processed: 500_000_000,
docs_routed: 5_000_000,
last_cursor: "500000000".to_string(),
error: None,
};
pod1_coordinator
.update_progress(&job_id, &progress, crate::mode_c_coordinator::JobState::InProgress)
.unwrap();
// Simulate claim expiration by setting claim_expires_at to the past
let now = now_ms();
let past_expiration = now - 10_000; // 10 seconds ago
pod1_coordinator
.set_claim_expires_at_for_test(&job_id, past_expiration)
.unwrap();
// Pod 2 reclaims expired claims
let reclaimed = pod2_coordinator.reclaim_expired_claims().unwrap();
assert_eq!(reclaimed, 1);
// Verify the job is back in queued state
let job = pod2_coordinator.get_job(&job_id).unwrap().unwrap();
assert_eq!(job.state, "queued");
assert!(job.claimed_by.is_none());
// Verify progress was preserved
let preserved_progress: JobProgress = serde_json::from_str(&job.progress).unwrap();
assert_eq!(preserved_progress.bytes_processed, 500_000_000);
assert_eq!(preserved_progress.docs_routed, 5_000_000);
assert_eq!(preserved_progress.last_cursor, "500000000");
// Pod 2 claims the job
let pod2_claimed = pod2_coordinator.claim_job().unwrap().unwrap();
assert_eq!(pod2_claimed.id, job_id);
assert_eq!(pod2_claimed.claimed_by, "pod-2");
// Pod 2 can resume from the last_cursor
let resume_progress: JobProgress = pod2_claimed.parse_progress().unwrap();
assert_eq!(resume_progress.last_cursor, "500000000");
}
/// P6.5-A3: HPA on `miroir_background_queue_depth > 10` triggers scale-up; scale-down once empty.
#[tokio::test]
async fn p6_5_a3_queue_depth_metric_drives_hpa_scaling() {
let coordinator = test_coordinator("pod-1");
// Initially, queue is empty
let queue_depth = coordinator.queue_depth().unwrap();
assert_eq!(queue_depth, 0);
// Enqueue 15 jobs (above HPA threshold of 10)
for i in 0..15 {
let params = JobParams {
index_uid: format!("test-index-{}", i),
primary_key: Some("id".to_string()),
shard_count: Some(64),
old_shards: None,
target_shards: None,
shadow_index: None,
chunk: None,
source_url: Some(format!("https://example.com/dump{}.ndjson", i)),
source_size_bytes: Some(1_000_000_000),
};
coordinator
.enqueue_job(JobType::DumpImport, params)
.unwrap();
}
// Queue depth should be 15
let queue_depth = coordinator.queue_depth().unwrap();
assert_eq!(queue_depth, 15);
// HPA would scale up when queue_depth > 10
let should_scale_up = queue_depth > 10;
assert!(should_scale_up);
// Simulate processing: claim and complete jobs
for _ in 0..15 {
if let Some(claimed) = coordinator.claim_job().unwrap() {
let progress = JobProgress {
bytes_processed: 1_000_000_000,
docs_routed: 10_000_000,
last_cursor: "1000000000".to_string(),
error: None,
};
coordinator
.complete_job(&claimed.id, &progress)
.unwrap();
}
}
// Queue depth should now be 0
let queue_depth = coordinator.queue_depth().unwrap();
assert_eq!(queue_depth, 0);
// HPA would scale down when queue is empty
let should_scale_down = queue_depth == 0;
assert!(should_scale_down);
}
/// P6.5-A4: Two concurrent dumps; chunks interleave in claims; neither starves.
#[tokio::test]
async fn p6_5_a4_concurrent_dumps_interleave_without_starvation() {
let store = Arc::new(MockTaskStore::new());
let pod1_coordinator = test_coordinator_with_store("pod-1", store.clone());
let pod2_coordinator = test_coordinator_with_store("pod-2", store);
// Enqueue two dump jobs concurrently
let params1 = JobParams {
index_uid: "test-index-1".to_string(),
primary_key: Some("id".to_string()),
shard_count: Some(64),
old_shards: None,
target_shards: None,
shadow_index: None,
chunk: None,
source_url: Some("https://example.com/dump1.ndjson".to_string()),
source_size_bytes: Some(1_000_000_000),
};
let params2 = JobParams {
index_uid: "test-index-2".to_string(),
primary_key: Some("id".to_string()),
shard_count: Some(64),
old_shards: None,
target_shards: None,
shadow_index: None,
chunk: None,
source_url: Some("https://example.com/dump2.ndjson".to_string()),
source_size_bytes: Some(1_000_000_000),
};
let job_id1 = pod1_coordinator
.enqueue_job(JobType::DumpImport, params1)
.unwrap();
let job_id2 = pod1_coordinator
.enqueue_job(JobType::DumpImport, params2)
.unwrap();
// Pod 1 claims first job and splits it
let claimed1 = pod1_coordinator.claim_job().unwrap().unwrap();
assert_eq!(claimed1.id, job_id1);
let chunks1: Vec<JobChunk> = vec![
JobChunk {
index: 0,
total: 2,
start: "0".to_string(),
end: "500000000".to_string(),
size_bytes: 500_000_000,
},
JobChunk {
index: 1,
total: 2,
start: "500000000".to_string(),
end: "1000000000".to_string(),
size_bytes: 500_000_000,
},
];
pod1_coordinator
.split_job_into_chunks(&claimed1, chunks1)
.unwrap();
// Pod 2 claims second job and splits it
let claimed2 = pod2_coordinator.claim_job().unwrap().unwrap();
assert_eq!(claimed2.id, job_id2);
let chunks2: Vec<JobChunk> = vec![
JobChunk {
index: 0,
total: 2,
start: "0".to_string(),
end: "500000000".to_string(),
size_bytes: 500_000_000,
},
JobChunk {
index: 1,
total: 2,
start: "500000000".to_string(),
end: "1000000000".to_string(),
size_bytes: 500_000_000,
},
];
pod2_coordinator
.split_job_into_chunks(&claimed2, chunks2)
.unwrap();
// Now we have 4 chunks total (2 from each dump)
let all_queued = pod1_coordinator.list_jobs_by_state("queued").unwrap();
assert_eq!(all_queued.len(), 4);
// Verify chunks are interleaved (both parent jobs are represented)
let parent_ids: Vec<_> = all_queued
.iter()
.filter_map(|j| j.parent_job_id.as_ref())
.collect();
assert!(parent_ids.contains(&&job_id1));
assert!(parent_ids.contains(&&job_id2));
// Both pods claim chunks from both dumps (fair interleaving)
let pod1_claimed = pod1_coordinator.claim_job().unwrap().unwrap();
let pod2_claimed = pod2_coordinator.claim_job().unwrap().unwrap();
// Verify they claimed chunks from different parents (or same - interleaving is fair)
let claimed_parent1 = pod1_claimed.parent_job_id.as_ref();
let claimed_parent2 = pod2_claimed.parent_job_id.as_ref();
// At least one parent should be represented
assert!(
claimed_parent1.is_some() || claimed_parent2.is_some(),
"at least one chunk should be claimed"
);
// Neither dump starves: both have chunks remaining
let remaining_queued = pod1_coordinator.list_jobs_by_state("queued").unwrap();
assert_eq!(remaining_queued.len(), 2);
// Verify both parent jobs still have pending chunks
let remaining_parents: Vec<_> = remaining_queued
.iter()
.filter_map(|j| j.parent_job_id.as_ref())
.collect();
assert!(remaining_parents.contains(&&job_id1) || remaining_parents.contains(&&job_id2));
}
/// P6.5-A5: Reshard backfill splits by shard-id range.
#[tokio::test]
async fn p6_5_a5_reshard_backfill_splits_by_shard_id_range() {
let coordinator = test_coordinator("pod-1");
// Enqueue a reshard backfill job (64 → 128 shards)
let params = JobParams {
index_uid: "test-index".to_string(),
primary_key: None,
shard_count: None,
old_shards: Some(64),
target_shards: Some(128),
shadow_index: Some("test-index-shadow".to_string()),
chunk: None,
source_url: None,
source_size_bytes: None,
};
let job_id = coordinator
.enqueue_job(JobType::ReshardBackfill, params)
.unwrap();
let claimed = coordinator.claim_job().unwrap().unwrap();
// Split into chunks by shard-id range (32 shards per chunk)
let old_shards = 64;
let shards_per_chunk = 32;
let total_chunks = (old_shards + shards_per_chunk - 1) / shards_per_chunk; // 2 chunks
let chunks: Vec<JobChunk> = (0..total_chunks)
.map(|i| {
let i = i as u32;
let start_shard = i * shards_per_chunk;
let end_shard = std::cmp::min(start_shard + shards_per_chunk, old_shards);
JobChunk {
index: i,
total: total_chunks as u32,
start: start_shard.to_string(),
end: end_shard.to_string(),
size_bytes: (end_shard - start_shard) as u64,
}
})
.collect();
coordinator
.split_job_into_chunks(&claimed, chunks)
.unwrap();
// Verify 2 chunks were created
let child_jobs = coordinator.list_chunks(&job_id).unwrap();
assert_eq!(child_jobs.len(), 2);
// Verify shard ranges
let chunk0 = &child_jobs[0];
let chunk1 = &child_jobs[1];
let claimed0 = crate::mode_c_coordinator::ClaimedJob {
id: chunk0.id.clone(),
type_: chunk0.type_.clone(),
params: chunk0.params.clone(),
progress: chunk0.progress.clone(),
claimed_by: "test".to_string(),
claim_expires_at: 0,
parent_job_id: chunk0.parent_job_id.clone(),
chunk_index: chunk0.chunk_index,
total_chunks: chunk0.total_chunks,
};
let params0: JobParams = claimed0.parse_params().unwrap();
let claimed1 = crate::mode_c_coordinator::ClaimedJob {
id: chunk1.id.clone(),
type_: chunk1.type_.clone(),
params: chunk1.params.clone(),
progress: chunk1.progress.clone(),
claimed_by: "test".to_string(),
claim_expires_at: 0,
parent_job_id: chunk1.parent_job_id.clone(),
chunk_index: chunk1.chunk_index,
total_chunks: chunk1.total_chunks,
};
let params1: JobParams = claimed1.parse_params().unwrap();
assert_eq!(params0.chunk.as_ref().unwrap().start, "0");
assert_eq!(params0.chunk.as_ref().unwrap().end, "32");
assert_eq!(params1.chunk.as_ref().unwrap().start, "32");
assert_eq!(params1.chunk.as_ref().unwrap().end, "64");
}
/// P6.5-A6: Heartbeat renews claim; missed heartbeat causes expiration.
#[tokio::test]
async fn p6_5_a6_heartbeat_renews_claim_missed_heartbeat_expires() {
let coordinator = test_coordinator("pod-1");
// Enqueue and claim a job
let params = JobParams {
index_uid: "test-index".to_string(),
primary_key: Some("id".to_string()),
shard_count: Some(64),
old_shards: None,
target_shards: None,
shadow_index: None,
chunk: None,
source_url: Some("https://example.com/dump.ndjson".to_string()),
source_size_bytes: Some(1_000_000_000),
};
let job_id = coordinator
.enqueue_job(JobType::DumpImport, params)
.unwrap();
let claimed = coordinator.claim_job().unwrap().unwrap();
let original_expires_at = claimed.claim_expires_at;
// Sleep a bit to ensure time advances
sleep(Duration::from_millis(10)).await;
// Renew the claim (heartbeat)
let renewed = coordinator.renew_claim(&job_id).unwrap();
assert!(renewed);
// Verify the expiration time was extended
let job = coordinator.get_job(&job_id).unwrap().unwrap();
assert!(job.claim_expires_at.unwrap() > original_expires_at);
// Simulate missed heartbeat by setting expiration to the past
let now = now_ms();
coordinator
.set_claim_expires_at_for_test(&job_id, now - 1000)
.unwrap();
// Reclaim expired claims
let reclaimed = coordinator.reclaim_expired_claims().unwrap();
assert_eq!(reclaimed, 1);
// Job should be back in queued state
let job = coordinator.get_job(&job_id).unwrap().unwrap();
assert_eq!(job.state, "queued");
}
/// Helper to get current time in milliseconds.
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}

View file

@ -4,6 +4,9 @@
//! and renews claims. Large jobs are split into chunks; chunk jobs execute
//! the actual work (dump import, reshard backfill).
#[cfg(test)]
mod acceptance_tests;
use crate::error::{MiroirError, Result};
use crate::mode_c_coordinator::{ClaimedJob, JobChunk, JobParams, JobProgress, JobType, ModeCCoordinator};
use crate::reshard_chunking;

View file

@ -69,6 +69,10 @@ impl TaskStore for MockTaskStore {
claimed_by: None,
claim_expires_at: None,
progress: job.progress.clone(),
parent_job_id: job.parent_job_id.clone(),
chunk_index: job.chunk_index,
total_chunks: job.total_chunks,
created_at: Some(job.created_at),
});
Ok(())
}
@ -87,6 +91,30 @@ impl TaskStore for MockTaskStore {
Ok(jobs.clone())
}
fn count_jobs_by_state(&self, _state: &str) -> Result<u64> {
Ok(0)
}
fn list_expired_claims(&self, _now_ms: i64) -> Result<Vec<JobRow>> {
Ok(Vec::new())
}
fn list_jobs_by_parent(&self, _parent_job_id: &str) -> Result<Vec<JobRow>> {
Ok(Vec::new())
}
fn reclaim_job_claim(&self, _id: &str, _state: &str, _progress: &str) -> Result<bool> {
Ok(true)
}
fn claim_job(&self, _id: &str, _claimed_by: &str, _claim_expires_at: i64) -> Result<bool> {
Ok(false)
}
fn renew_job_claim(&self, _id: &str, _claim_expires_at: i64) -> Result<bool> {
Ok(false)
}
fn try_acquire_leader_lease(
&self,
scope: &str,
@ -207,12 +235,7 @@ impl TaskStore for MockTaskStore {
fn delete_expired_idempotency_entries(&self, _now_ms: i64) -> Result<usize> {
Ok(0)
}
fn claim_job(&self, _id: &str, _claimed_by: &str, _claim_expires_at: i64) -> Result<bool> {
Ok(false)
}
fn renew_job_claim(&self, _id: &str, _claim_expires_at: i64) -> Result<bool> {
Ok(false)
}
fn upsert_canary(&self, _canary: &crate::task_store::NewCanary) -> Result<()> {
Ok(())
}
@ -453,6 +476,10 @@ async fn p4_1_a2_progress_persistence_pods_resume_migration() {
params: progress,
state: "running".to_string(),
progress: "{\"total_shards\":1,\"completed\":0,\"docs_migrated\":5000}".to_string(),
parent_job_id: None,
chunk_index: None,
total_chunks: None,
created_at: now_ms(),
};
tokio::task::spawn_blocking({
let task_store = task_store.clone();

View file

@ -1193,6 +1193,10 @@ impl RebalancerWorker {
.count(),
job.total_docs_migrated
),
parent_job_id: None,
chunk_index: None,
total_chunks: None,
created_at: now_ms(),
};
tokio::task::spawn_blocking({

View file

@ -103,6 +103,22 @@ impl TaskStore for MockTaskStore {
Ok(Vec::new())
}
fn count_jobs_by_state(&self, _state: &str) -> Result<u64> {
Ok(0)
}
fn list_expired_claims(&self, _now_ms: i64) -> Result<Vec<JobRow>> {
Ok(Vec::new())
}
fn list_jobs_by_parent(&self, _parent_job_id: &str) -> Result<Vec<JobRow>> {
Ok(Vec::new())
}
fn reclaim_job_claim(&self, _id: &str, _state: &str, _progress: &str) -> Result<bool> {
Ok(true)
}
fn try_acquire_leader_lease(
&self,
_scope: &str,

View file

@ -182,6 +182,10 @@ fn test_multiple_tables_survive_restart() {
params: "{}".to_string(),
state: "queued".to_string(),
progress: "{}".to_string(),
parent_job_id: None,
chunk_index: None,
total_chunks: None,
created_at: 1714500000000,
};
store.insert_job(&job).unwrap();

View file

@ -399,6 +399,10 @@ proptest! {
params: r#"{"test": "param"}"#.to_string(),
state: state.clone(),
progress: r#"{"status": "starting"}"#.to_string(),
parent_job_id: None,
chunk_index: None,
total_chunks: None,
created_at: 1714500000000,
};
store.insert_job(&job).unwrap();
@ -423,6 +427,10 @@ proptest! {
params: "{}".to_string(),
state: "queued".to_string(),
progress: "{}".to_string(),
parent_job_id: None,
chunk_index: None,
total_chunks: None,
created_at: 1714500000000,
};
store.insert_job(&job).unwrap();

View file

@ -17,6 +17,7 @@ use miroir_core::{
task_registry::TaskRegistryImpl,
task_store::{RedisTaskStore, TaskStore},
topology::{Node, NodeId, Topology},
mode_c_worker::{ModeCWorker, ModeCWorkerConfig},
};
use rand::RngCore;
use serde::{Deserialize, Serialize};
@ -331,6 +332,8 @@ pub struct AppState {
pub alias_registry: Arc<miroir_core::alias::AliasRegistry>,
/// Leader election service for Mode B operations (plan §14.5).
pub leader_election: Option<Arc<LeaderElection>>,
/// Mode C worker for chunked background jobs (plan §14.5 Mode C).
pub mode_c_worker: Option<Arc<ModeCWorker>>,
}
impl AppState {
@ -543,6 +546,22 @@ impl AppState {
None
};
// Create Mode C worker for chunked background jobs (plan §14.5 Mode C)
let mode_c_worker = if let Some(ref store) = task_store {
let worker_config = ModeCWorkerConfig {
poll_interval_ms: 1000, // 1 second
heartbeat_interval_ms: 10000, // 10 seconds
max_concurrent_jobs: 3,
};
Some(Arc::new(ModeCWorker::new(
store.clone(),
pod_id.clone(),
worker_config,
)))
} else {
None
};
Self {
config: Arc::new(config),
topology: topology_arc,
@ -566,6 +585,7 @@ impl AppState {
session_manager,
alias_registry,
leader_election,
mode_c_worker,
}
}