P1.5: Implement scatter module with covering-set construction + dispatch trait

- Add NodeClient trait for HTTP calls to Meilisearch nodes (seam between pure miroir-core and networked miroir-proxy)
- Add ScatterPlan struct containing chosen_group, target_shards, shard_to_node mapping, deadline_ms, hedging_eligible
- Implement plan_search_scatter() pure function that constructs the covering set without I/O
- Implement execute_scatter() async function that fans out to nodes with partial-failure handling
- Add MockNodeClient for testing with pre-programmed responses/errors
- Add unit tests for plan construction, query group rotation, shard-to-node mapping, hedging eligibility, and scatter execution

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-04-19 00:20:29 -04:00
parent 3481172f65
commit 612e7ce0ea
38 changed files with 2296 additions and 120 deletions

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,16 @@
{
"bead_id": "miroir-qon.1",
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 0,
"outcome": "success",
"duration_ms": 13367,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-04-19T00:50:15.822899604Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null
}

View file

View file

@ -0,0 +1,3 @@
```json
{"splittable": false}
```

View file

@ -0,0 +1,16 @@
{
"bead_id": "miroir-qon.2",
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 0,
"outcome": "success",
"duration_ms": 16071,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-04-19T00:51:19.041392499Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null
}

View file

View file

@ -0,0 +1 @@
{"splittable": false}

View file

@ -0,0 +1,16 @@
{
"bead_id": "miroir-qon.3",
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 0,
"outcome": "success",
"duration_ms": 13795,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-04-19T00:49:39.940334365Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null
}

View file

View file

@ -0,0 +1 @@
{"splittable": false}

View file

@ -0,0 +1,16 @@
{
"bead_id": "miroir-qon.4",
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 0,
"outcome": "success",
"duration_ms": 10715,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-04-19T00:50:28.482335907Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null
}

View file

View file

@ -0,0 +1,3 @@
```json
{"splittable": false}
```

View file

@ -0,0 +1,16 @@
{
"bead_id": "miroir-qon.5",
"agent": "claude-code-glm-5-1",
"provider": "zai",
"model": "glm-5.1",
"exit_code": 0,
"outcome": "success",
"duration_ms": 19143,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-04-19T00:55:42.734289546Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null
}

View file

View file

@ -0,0 +1 @@
{"splittable": false}

View file

@ -0,0 +1,16 @@
{
"bead_id": "miroir-qon.6",
"agent": "claude-code-glm-5-1",
"provider": "zai",
"model": "glm-5.1",
"exit_code": 0,
"outcome": "success",
"duration_ms": 133404,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-04-19T00:48:22.952561028Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null
}

View file

View file

@ -0,0 +1 @@
Done. Created `LICENSE` (MIT), `CHANGELOG.md` (Keep a Changelog 1.1.0 skeleton), and `.gitignore` (Rust + editor junk). All acceptance criteria verified — awk extractor returns content for `[0.1.0]`, `Cargo.lock` stays in VCS. Root commit `409f952` initialized the git repo. Bead `miroir-qon.6` closed.

View file

@ -0,0 +1,16 @@
{
"bead_id": "miroir-qon.7",
"agent": "claude-code-glm-5-1",
"provider": "zai",
"model": "glm-5.1",
"exit_code": 0,
"outcome": "success",
"duration_ms": 12793,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-04-19T00:55:03.752087580Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null
}

View file

View file

@ -0,0 +1 @@
{"splittable": false}

View file

@ -0,0 +1,16 @@
{
"bead_id": "miroir-zc2.1",
"agent": "claude-code-glm-5",
"provider": "zai",
"model": "glm-5",
"exit_code": 124,
"outcome": "timeout",
"duration_ms": 600001,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-04-19T00:57:22.426635970Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null
}

View file

View file

@ -0,0 +1 @@
Error: Reached max turns (30)

View file

@ -0,0 +1,16 @@
{
"bead_id": "miroir-zc2.2",
"agent": "claude-code-glm-5",
"provider": "zai",
"model": "glm-5",
"exit_code": 124,
"outcome": "timeout",
"duration_ms": 600002,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-04-19T01:02:21.906008483Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null
}

View file

View file

@ -0,0 +1,3 @@
Done. Research doc published at `docs/research/raft-task-store.md`, committed as `fe274a5`, pushed to master, bead `miroir-zc2.2` closed.
**Decision: Revisit before v2.0, do not ship in v0.x or v1.0.** Raft fails the plan's decision gate — it's 38x slower on writes, adds 90185 MB per pod, and has lower correctness maturity than Redis. The one genuine win (no external dependency) doesn't compensate for the regression on the other metrics. The hybrid approach (all three backends behind the same `TaskStore` trait) is documented as the migration path if we decide to ship Raft later.

3
.cargo/config.toml Normal file
View file

@ -0,0 +1,3 @@
[env]
CC_x86_64_unknown_linux_musl = "gcc"
CFLAGS_x86_64_unknown_linux_musl = "-static"

View file

@ -1 +1 @@
2b1ea87f3e20825ca10ab4a4bea83e1a30bb7800
47d586cc61c5c7c8051a4f3ea41040ddf9a2479e

456
Cargo.lock generated
View file

@ -29,6 +29,12 @@ version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]]
name = "anes"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
[[package]]
name = "anstream"
version = "1.0.0"
@ -108,6 +114,12 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "autocfg"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "axum"
version = "0.7.9"
@ -195,6 +207,21 @@ dependencies = [
"virtue",
]
[[package]]
name = "bit-set"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08807e080ed7f9d5433fa9b275196cfc35414f66a0c79d864dc51a0d825231a3"
dependencies = [
"bit-vec",
]
[[package]]
name = "bit-vec"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7"
[[package]]
name = "bitflags"
version = "2.11.1"
@ -225,6 +252,12 @@ version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
[[package]]
name = "cast"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "cc"
version = "1.2.60"
@ -247,6 +280,33 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "ciborium"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e"
dependencies = [
"ciborium-io",
"ciborium-ll",
"serde",
]
[[package]]
name = "ciborium-io"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757"
[[package]]
name = "ciborium-ll"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9"
dependencies = [
"ciborium-io",
"half",
]
[[package]]
name = "clap"
version = "4.6.1"
@ -350,6 +410,67 @@ dependencies = [
"libc",
]
[[package]]
name = "criterion"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f"
dependencies = [
"anes",
"cast",
"ciborium",
"clap",
"criterion-plot",
"is-terminal",
"itertools",
"num-traits",
"once_cell",
"oorandom",
"plotters",
"rayon",
"regex",
"serde",
"serde_derive",
"serde_json",
"tinytemplate",
"walkdir",
]
[[package]]
name = "criterion-plot"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1"
dependencies = [
"cast",
"itertools",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]]
name = "crunchy"
version = "0.2.4"
@ -423,6 +544,12 @@ dependencies = [
"const-random",
]
[[package]]
name = "either"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]]
name = "encoding_rs"
version = "0.8.35"
@ -448,6 +575,18 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "fallible-iterator"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
[[package]]
name = "fallible-streaming-iterator"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fastrand"
version = "2.4.1"
@ -472,6 +611,12 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "foldhash"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
[[package]]
name = "form_urlencoded"
version = "1.2.2"
@ -496,6 +641,17 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
[[package]]
name = "futures-macro"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-task"
version = "0.3.32"
@ -509,6 +665,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [
"futures-core",
"futures-macro",
"futures-task",
"pin-project-lite",
"slab",
@ -564,6 +721,17 @@ dependencies = [
"wasip3",
]
[[package]]
name = "half"
version = "2.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b"
dependencies = [
"cfg-if",
"crunchy",
"zerocopy",
]
[[package]]
name = "hashbrown"
version = "0.14.5"
@ -580,7 +748,16 @@ version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"foldhash",
"foldhash 0.1.5",
]
[[package]]
name = "hashbrown"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
dependencies = [
"foldhash 0.2.0",
]
[[package]]
@ -598,12 +775,27 @@ dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "hashlink"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea0b22561a9c04a7cb1a302c013e0259cd3b4bb619f145b32f72b8b4bcbed230"
dependencies = [
"hashbrown 0.16.1",
]
[[package]]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hermit-abi"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
[[package]]
name = "http"
version = "1.4.0"
@ -846,12 +1038,32 @@ dependencies = [
"serde",
]
[[package]]
name = "is-terminal"
version = "0.4.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46"
dependencies = [
"hermit-abi",
"libc",
"windows-sys 0.61.2",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695"
[[package]]
name = "itertools"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.18"
@ -908,6 +1120,17 @@ dependencies = [
"libc",
]
[[package]]
name = "libsqlite3-sys"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f111c8c41e7c61a49cd34e44c7619462967221a6443b0ec299e0ac30cfb9b1"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "linux-raw-sys"
version = "0.12.1"
@ -991,10 +1214,16 @@ version = "0.1.0"
dependencies = [
"bincode",
"config",
"criterion",
"futures-util",
"proptest",
"rusqlite",
"serde",
"serde_json",
"serde_yaml",
"tempfile",
"thiserror 2.0.18",
"tokio",
"tracing",
"twox-hash",
"uuid",
@ -1055,6 +1284,15 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]]
name = "once_cell"
version = "1.21.4"
@ -1067,6 +1305,12 @@ version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe"
[[package]]
name = "oorandom"
version = "11.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e"
[[package]]
name = "option-ext"
version = "0.2.0"
@ -1167,6 +1411,40 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
[[package]]
name = "pkg-config"
version = "0.3.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e"
[[package]]
name = "plotters"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747"
dependencies = [
"num-traits",
"plotters-backend",
"plotters-svg",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "plotters-backend"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a"
[[package]]
name = "plotters-svg"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670"
dependencies = [
"plotters-backend",
]
[[package]]
name = "potential_utf"
version = "0.1.5"
@ -1229,12 +1507,37 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "proptest"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b45fcc2344c680f5025fe57779faef368840d0bd1f42f216291f0dc4ace4744"
dependencies = [
"bit-set",
"bit-vec",
"bitflags",
"num-traits",
"rand",
"rand_chacha",
"rand_xorshift",
"regex-syntax",
"rusty-fork",
"tempfile",
"unarray",
]
[[package]]
name = "protobuf"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quinn"
version = "0.11.9"
@ -1340,6 +1643,35 @@ dependencies = [
"getrandom 0.3.4",
]
[[package]]
name = "rand_xorshift"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "513962919efc330f829edb2535844d1b912b0fbe2ca165d613e4e8788bb05a5a"
dependencies = [
"rand_core",
]
[[package]]
name = "rayon"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb39b166781f92d482534ef4b4b1b2568f42613b53e5b6c160e24cfbfa30926d"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]]
name = "redox_syscall"
version = "0.5.18"
@ -1360,6 +1692,18 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "regex"
version = "1.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.4.14"
@ -1441,6 +1785,31 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "rsqlite-vfs"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8a1f2315036ef6b1fbacd1972e8ee7688030b0a2121edfc2a6550febd41574d"
dependencies = [
"hashbrown 0.16.1",
"thiserror 2.0.18",
]
[[package]]
name = "rusqlite"
version = "0.39.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0d2b0146dd9661bf67bb107c0bb2a55064d556eeb3fc314151b957f313bcd4e"
dependencies = [
"bitflags",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink 0.11.0",
"libsqlite3-sys",
"smallvec",
"sqlite-wasm-rs",
]
[[package]]
name = "rust-ini"
version = "0.20.0"
@ -1511,12 +1880,33 @@ version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "rusty-fork"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc6bf79ff24e648f6da1f8d1f011e9cac26491b619e6b9280f2b47f1774e6ee2"
dependencies = [
"fnv",
"quick-error",
"tempfile",
"wait-timeout",
]
[[package]]
name = "ryu"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -1675,6 +2065,18 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "sqlite-wasm-rs"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b2c760607300407ddeaee518acf28c795661b7108c75421303dbefb237d3a36"
dependencies = [
"cc",
"js-sys",
"rsqlite-vfs",
"wasm-bindgen",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.1"
@ -1805,6 +2207,16 @@ dependencies = [
"zerovec",
]
[[package]]
name = "tinytemplate"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "tinyvec"
version = "1.11.0"
@ -2034,6 +2446,12 @@ version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971"
[[package]]
name = "unarray"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94"
[[package]]
name = "unicode-ident"
version = "1.0.24"
@ -2112,6 +2530,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.5"
@ -2124,6 +2548,25 @@ version = "0.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1"
[[package]]
name = "wait-timeout"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ac3b126d3914f9849036f826e054cbabdc8519970b8998ddaf3b5bd3c65f11"
dependencies = [
"libc",
]
[[package]]
name = "walkdir"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
dependencies = [
"same-file",
"winapi-util",
]
[[package]]
name = "want"
version = "0.3.1"
@ -2275,6 +2718,15 @@ dependencies = [
"rustls-pki-types",
]
[[package]]
name = "winapi-util"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "windows-link"
version = "0.2.1"
@ -2546,7 +2998,7 @@ checksum = "8902160c4e6f2fb145dbe9d6760a75e3c9522d8bf796ed7047c85919ac7115f8"
dependencies = [
"arraydeque",
"encoding_rs",
"hashlink",
"hashlink 0.8.4",
]
[[package]]

View file

@ -15,3 +15,4 @@ serde_json = "1.0"
thiserror = "2.0"
tracing = "0.1"
pretty_assertions = "1.4"
rusqlite = { version = "0.39", features = ["bundled"] }

View file

@ -16,6 +16,7 @@ tracing = { workspace = true }
uuid = { version = "1", features = ["v4", "serde"] }
config = "0.14"
rusqlite = { workspace = true }
futures-util = "0.3"
# Raft prototype (P12.OP2 research) — not for production use
# openraft 0.9.22 fails on stable Rust 1.87 (validit uses let_chains).
@ -44,3 +45,4 @@ harness = false
tempfile = "3"
proptest = "1"
criterion = "0.5"
tokio = { version = "1", features = ["rt", "macros", "time"] }

View file

@ -1,74 +1,843 @@
//! Result merger: combines shard results into a single response.
use crate::Result;
use serde_json::Value;
use serde_json::{Map, Value};
use std::collections::BTreeMap;
use std::cmp::Ordering;
/// Result merger: combines responses from multiple shards.
pub trait Merger: Send + Sync {
/// Merge search results from multiple shards.
///
/// Takes the raw JSON responses from each shard and produces
/// a merged result with global sorting, offset/limit applied,
/// and facet aggregation.
fn merge(
&self,
shard_responses: Vec<ShardResponse>,
offset: usize,
limit: usize,
client_requested_score: bool,
) -> Result<MergedResult>;
/// Input to the merge operation.
#[derive(Debug, Clone)]
pub struct MergeInput {
/// One response page per node in the covering set.
pub shard_hits: Vec<ShardHitPage>,
/// Original offset from the client request.
pub offset: usize,
/// Original limit from the client request.
pub limit: usize,
/// Whether the client requested scores in the response.
pub client_requested_score: bool,
/// Facet names requested (for filtering which facets to return).
pub facets: Option<Vec<String>>,
}
/// Response from a single shard.
/// Response from a single shard (node).
#[derive(Debug, Clone)]
pub struct ShardResponse {
/// Shard identifier.
pub shard_id: u32,
pub struct ShardHitPage {
/// Raw JSON response from the node.
pub body: Value,
/// Whether this shard succeeded.
pub success: bool,
}
/// Merged search result.
#[derive(Debug, Clone)]
pub struct MergedResult {
#[derive(Debug, Clone, serde::Serialize)]
pub struct MergedSearchResult {
/// Merged hits (globally sorted, offset/limit applied).
pub hits: Vec<Value>,
/// Aggregated facets.
pub facets: Value,
/// Aggregated facet distribution.
pub facet_distribution: Option<BTreeMap<String, BTreeMap<String, u64>>>,
/// Estimated total hits (sum of shard totals).
pub total_hits: u64,
pub estimated_total_hits: u64,
/// Processing time in milliseconds.
/// Processing time in milliseconds (max across covering set).
pub processing_time_ms: u64,
/// Whether the response is degraded (some shards failed).
/// Whether the response is degraded (some shards had errors).
pub degraded: bool,
}
/// Default stub implementation of Merger.
#[derive(Debug, Clone, Default)]
pub struct StubMerger;
/// RRF constant k.
///
/// This is the denominator constant used in Reciprocal Rank Fusion.
/// The value 60 is the default recommended in the RRF literature and
/// is used by OpenSearch for hybrid search.
const RRF_K: u32 = 60;
impl Merger for StubMerger {
fn merge(
&self,
_shard_responses: Vec<ShardResponse>,
_offset: usize,
_limit: usize,
_client_requested_score: bool,
) -> Result<MergedResult> {
Ok(MergedResult {
hits: Vec::new(),
facets: serde_json::json!({}),
total_hits: 0,
processing_time_ms: 0,
degraded: false,
})
/// A document with its accumulated RRF score.
#[derive(Debug, Clone)]
struct RRFDocument {
/// Accumulated RRF score across all shards.
rrf_score: f64,
/// Primary key for tie-breaking.
primary_key: String,
/// The hit document (JSON object) from the highest-ranking shard.
hit: Map<String, Value>,
}
impl PartialEq for RRFDocument {
fn eq(&self, other: &Self) -> bool {
self.rrf_score == other.rrf_score && self.primary_key == other.primary_key
}
}
impl Eq for RRFDocument {}
impl PartialOrd for RRFDocument {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for RRFDocument {
fn cmp(&self, other: &Self) -> Ordering {
// Primary sort: RRF score descending (higher score = better rank)
match self.rrf_score.partial_cmp(&other.rrf_score) {
Some(Ordering::Equal) => {
// Secondary sort: primary key ascending for deterministic tie-breaking
self.primary_key.cmp(&other.primary_key)
}
Some(ord) => ord.reverse(),
None => {
// NaN case: treat as lowest score
if self.rrf_score.is_nan() && !other.rrf_score.is_nan() {
Ordering::Less
} else if !self.rrf_score.is_nan() && other.rrf_score.is_nan() {
Ordering::Greater
} else {
Ordering::Equal
}
}
}
}
}
/// Merge search results from multiple shards into a single response.
///
/// This is a pure function with no side effects, making it testable
/// without a network and ensuring deterministic output.
pub fn merge(input: MergeInput) -> Result<MergedSearchResult> {
let mut estimated_total_hits = 0u64;
let mut max_processing_time = 0u64;
let mut degraded = false;
// Collect all hits with their ranks from all shards.
// Use a map to aggregate RRF scores for documents appearing in multiple shards.
let mut rrf_map: std::collections::HashMap<String, RRFDocument> = std::collections::HashMap::new();
for shard_page in &input.shard_hits {
let body = &shard_page.body;
// Check for degraded response.
if let Some(serde_json::Value::Bool(false)) = body.get("success") {
degraded = true;
continue;
}
// Extract estimated total hits.
if let Some(Value::Number(n)) = body.get("estimatedTotalHits") {
if let Some(n) = n.as_u64() {
estimated_total_hits = estimated_total_hits.saturating_add(n);
}
}
// Extract processing time.
if let Some(Value::Number(n)) = body.get("processingTimeMs") {
if let Some(n) = n.as_u64() {
max_processing_time = max_processing_time.max(n);
}
}
// Extract hits with ranks (position in shard's results).
if let Some(Value::Array(hits)) = body.get("hits") {
for (rank, hit) in hits.iter().enumerate() {
if let Value::Object(ref map) = hit {
let map = map.clone();
// Extract primary key for deduplication.
let primary_key = map
.get("id")
.or_else(|| map.get("pk"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
// Compute RRF contribution: 1 / (k + rank)
// rank is 0-based, so we add 1 to convert to 1-based for RRF formula
let rrf_contribution = 1.0 / ((RRF_K as f64) + (rank as f64) + 1.0);
// Aggregate RRF scores across shards.
use std::collections::hash_map::Entry;
match rrf_map.entry(primary_key.clone()) {
Entry::Vacant(e) => {
e.insert(RRFDocument {
rrf_score: rrf_contribution,
primary_key,
hit: map,
});
}
Entry::Occupied(mut e) => {
// Document appears in multiple shards: sum RRF contributions.
e.get_mut().rrf_score += rrf_contribution;
}
}
}
}
}
}
// Convert map to vec and sort by RRF score descending.
let mut merged_docs: Vec<_> = rrf_map.into_values().collect();
merged_docs.sort();
// Apply offset + limit.
let skip = input.offset;
let take = input.limit;
let paginated_hits: Vec<_> = merged_docs
.into_iter()
.skip(skip)
.take(take)
.collect();
// Strip reserved fields and rebuild hits.
let mut hits = Vec::with_capacity(paginated_hits.len());
for mut doc in paginated_hits {
// Strip _rankingScore if not requested (RRF doesn't use original scores).
if !input.client_requested_score {
doc.hit.remove("_rankingScore");
}
// Always strip _miroir_* fields.
doc.hit.retain(|k, _| !k.starts_with("_miroir_"));
hits.push(Value::Object(doc.hit));
}
// Merge facets.
let facet_distribution = merge_facets(&input.shard_hits, input.facets.as_deref());
Ok(MergedSearchResult {
hits,
facet_distribution,
estimated_total_hits,
processing_time_ms: max_processing_time,
degraded,
})
}
/// Merge facet distributions from multiple shards.
///
/// Uses BTreeMap for stable ordering (deterministic serialization).
fn merge_facets(
shard_pages: &[ShardHitPage],
requested_facets: Option<&[String]>,
) -> Option<BTreeMap<String, BTreeMap<String, u64>>> {
let mut merged: BTreeMap<String, BTreeMap<String, u64>> = BTreeMap::new();
for shard_page in shard_pages {
let body = &shard_page.body;
// Meilisearch uses "facetDistribution" for facet results.
if let Some(Value::Object(facets)) = body.get("facetDistribution") {
for (facet_name, facet_values) in facets {
// Skip if not requested (if a filter was provided).
if let Some(requested) = requested_facets {
if !requested.iter().any(|f| f == facet_name) {
continue;
}
}
if let Value::Object(values_map) = facet_values {
let merged_facet = merged.entry(facet_name.clone()).or_default();
for (value, count) in values_map {
if let Value::Number(n) = count {
if let Some(n) = n.as_u64() {
*merged_facet.entry(value.clone()).or_insert(0) += n;
}
}
}
}
}
}
}
if merged.is_empty() {
None
} else {
Some(merged)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn make_hit(id: &str, score: f64, shard: u32) -> Value {
json!({
"id": id,
"title": format!("Document {}", id),
"_rankingScore": score,
"_miroir_shard": shard,
})
}
/// Make a hit without score (for testing RRF rank-based ordering).
fn make_hit_ranked(id: &str, shard: u32) -> Value {
json!({
"id": id,
"title": format!("Document {}", id),
"_rankingScore": 0.5, // RRF ignores score, only rank matters
"_miroir_shard": shard,
})
}
fn make_shard_response(
hits: Vec<Value>,
total_hits: u64,
processing_time: u64,
) -> ShardHitPage {
ShardHitPage {
body: json!({
"hits": hits,
"estimatedTotalHits": total_hits,
"processingTimeMs": processing_time,
"facetDistribution": {},
}),
}
}
#[test]
fn test_merge_basic() {
let input = MergeInput {
shard_hits: vec![make_shard_response(
vec![
make_hit("doc1", 0.9, 0),
make_hit("doc2", 0.7, 0),
],
100,
15,
)],
offset: 0,
limit: 10,
client_requested_score: false,
facets: None,
};
let result = merge(input).unwrap();
assert_eq!(result.hits.len(), 2);
assert_eq!(result.estimated_total_hits, 100);
assert_eq!(result.processing_time_ms, 15);
assert!(!result.degraded);
// Score should be stripped.
let hit = &result.hits[0];
assert!(hit.get("_rankingScore").is_none());
assert!(hit.get("_miroir_shard").is_none());
assert_eq!(hit.get("id").unwrap(), "doc1");
}
#[test]
fn test_merge_global_sort() {
let input = MergeInput {
shard_hits: vec![
make_shard_response(vec![make_hit("doc1", 0.5, 0)], 50, 10),
make_shard_response(vec![make_hit("doc2", 0.9, 1)], 50, 10),
make_shard_response(vec![make_hit("doc3", 0.7, 2)], 50, 10),
],
offset: 0,
limit: 10,
client_requested_score: true,
facets: None,
};
let result = merge(input).unwrap();
assert_eq!(result.hits.len(), 3);
// RRF: all docs are at rank 0 in their respective shards.
// With equal ranks, tie-break by primary key (alphabetically).
assert_eq!(result.hits[0].get("id").unwrap(), "doc1");
assert_eq!(result.hits[1].get("id").unwrap(), "doc2");
assert_eq!(result.hits[2].get("id").unwrap(), "doc3");
}
#[test]
fn test_merge_rrf_rank_ordering() {
// Test RRF: higher rank (lower position) contributes more to RRF score.
// shard0: [low_rank_doc (rank 0), mid_rank_doc (rank 10)]
// shard1: [high_rank_doc (rank 0)]
// shard2: [mid_rank_doc (rank 0) - same doc appears in two shards!]
//
// Expected RRF scores:
// - low_rank_doc: 1/(60+0+1) = 1/61 (only in shard0)
// - high_rank_doc: 1/(60+0+1) = 1/61 (only in shard1)
// - mid_rank_doc: 1/(60+10+1) + 1/(60+0+1) = 1/71 + 1/61 (rank 10 in shard0, rank 0 in shard2)
//
// mid_rank_doc should win because it appears in multiple shards.
let mut shard0_hits = vec![];
let mut shard1_hits = vec![];
let mut shard2_hits = vec![];
// Build shard0: low_rank_doc at position 0, mid_rank_doc at position 10
shard0_hits.push(make_hit("low_rank_doc", 0.1, 0));
for i in 0..9 {
shard0_hits.push(make_hit(&format!("filler_0_{}", i), 0.5, 0));
}
shard0_hits.push(make_hit("mid_rank_doc", 0.2, 0));
// shard1: high_rank_doc at position 0
shard1_hits.push(make_hit("high_rank_doc", 0.3, 1));
// shard2: mid_rank_doc at position 0 (same doc appears again!)
shard2_hits.push(make_hit("mid_rank_doc", 0.4, 2));
let input = MergeInput {
shard_hits: vec![
make_shard_response(shard0_hits, 100, 10),
make_shard_response(shard1_hits, 100, 10),
make_shard_response(shard2_hits, 100, 10),
],
offset: 0,
limit: 10,
client_requested_score: false,
facets: None,
};
let result = merge(input).unwrap();
// mid_rank_doc should be first because it appears in multiple shards
// (RRF contributions sum: rank 10 in shard0 + rank 0 in shard2)
assert_eq!(result.hits[0].get("id").unwrap(), "mid_rank_doc");
// low_rank_doc and high_rank_doc both at rank 0 in their shards
// Tie-break by primary key alphabetically
assert_eq!(result.hits[1].get("id").unwrap(), "high_rank_doc");
assert_eq!(result.hits[2].get("id").unwrap(), "low_rank_doc");
}
#[test]
fn test_merge_rrf_duplicate_handling() {
// Test that the same document appearing in multiple shards
// gets its RRF score summed.
//
// doc1 appears at rank 0 in shard0 and rank 5 in shard1
// doc2 appears at rank 0 in shard2
//
// RRF(doc1) = 1/(60+0+1) + 1/(60+5+1) = 1/61 + 1/66
// RRF(doc2) = 1/(60+0+1) = 1/61
//
// doc1 should rank higher.
let shard0_hits = vec![make_hit("doc1", 0.1, 0)];
let mut shard1_hits = vec![];
let shard2_hits = vec![make_hit("doc2", 0.9, 2)];
// Add filler hits to shard1 to make doc1 appear at rank 5
for i in 0..5 {
shard1_hits.push(make_hit(&format!("filler_1_{}", i), 0.5, 1));
}
shard1_hits.push(make_hit("doc1", 0.2, 1));
let input = MergeInput {
shard_hits: vec![
make_shard_response(shard0_hits, 50, 10),
make_shard_response(shard1_hits, 50, 10),
make_shard_response(shard2_hits, 50, 10),
],
offset: 0,
limit: 10,
client_requested_score: false,
facets: None,
};
let result = merge(input).unwrap();
// doc1 should rank higher despite lower score because
// it appears in multiple shards and gets RRF contribution boost
assert_eq!(result.hits[0].get("id").unwrap(), "doc1");
assert_eq!(result.hits[1].get("id").unwrap(), "doc2");
}
#[test]
fn test_merge_offset_limit() {
let input = MergeInput {
shard_hits: vec![make_shard_response(
vec![
make_hit("doc1", 0.9, 0),
make_hit("doc2", 0.8, 0),
make_hit("doc3", 0.7, 0),
make_hit("doc4", 0.6, 0),
make_hit("doc5", 0.5, 0),
],
100,
10,
)],
offset: 1,
limit: 2,
client_requested_score: false,
facets: None,
};
let result = merge(input).unwrap();
assert_eq!(result.hits.len(), 2);
assert_eq!(result.hits[0].get("id").unwrap(), "doc2");
assert_eq!(result.hits[1].get("id").unwrap(), "doc3");
}
#[test]
fn test_merge_preserves_score_when_requested() {
// RRF doesn't use scores for ranking, but we still preserve
// the original score field when requested.
let input = MergeInput {
shard_hits: vec![make_shard_response(
vec![make_hit("doc1", 0.9, 0)],
50,
10,
)],
offset: 0,
limit: 10,
client_requested_score: true,
facets: None,
};
let result = merge(input).unwrap();
assert_eq!(
result.hits[0].get("_rankingScore").unwrap().as_f64(),
Some(0.9)
);
}
#[test]
fn test_merge_rrf_ignores_scores() {
// Test that RRF ordering is based on rank, not score.
// Even though doc3 has highest score, it's ranked lower in its shard.
//
// shard0: doc2 at rank 0 with score 0.9 (same rank, tie-break by id)
// shard1: doc1 at rank 0 with score 0.1 (same rank, tie-break by id)
// shard2: doc3 at rank 2 (position 2) with score 1.0 (lower rank)
//
// RRF scores:
// - doc1: 1/61 (rank 0)
// - doc2: 1/61 (rank 0)
// - filler: 1/62 (rank 1)
// - doc3: 1/63 (rank 2)
//
// Ordering: doc1, doc2 (tie-break alphabetically), filler, then doc3
let shard0_hits = vec![make_hit("doc2", 0.9, 0)]; // High score, rank 0
let shard1_hits = vec![make_hit("doc1", 0.1, 0)]; // Low score, rank 0
let shard2_hits = vec![
make_hit("filler", 0.5, 2),
make_hit("filler2", 0.5, 2),
make_hit("doc3", 1.0, 2), // Highest score, but rank 2
];
let input = MergeInput {
shard_hits: vec![
make_shard_response(shard0_hits, 50, 10),
make_shard_response(shard1_hits, 50, 10),
make_shard_response(shard2_hits, 50, 10),
],
offset: 0,
limit: 10,
client_requested_score: true,
facets: None,
};
let result = merge(input).unwrap();
// doc1 and doc2 both at rank 0, tie-break alphabetically
assert_eq!(result.hits[0].get("id").unwrap(), "doc1");
assert_eq!(result.hits[1].get("id").unwrap(), "doc2");
// filler and filler2 at rank 1
assert_eq!(result.hits[2].get("id").unwrap(), "filler");
assert_eq!(result.hits[3].get("id").unwrap(), "filler2");
// doc3 at rank 2, comes last despite highest score
assert_eq!(result.hits[4].get("id").unwrap(), "doc3");
}
#[test]
fn test_merge_strips_miroir_fields() {
let input = MergeInput {
shard_hits: vec![make_shard_response(
vec![json!({
"id": "doc1",
"title": "Test",
"_rankingScore": 0.9,
"_miroir_shard": 0,
"_miroir_node": "node-1",
"_miroir_group": 1,
})],
50,
10,
)],
offset: 0,
limit: 10,
client_requested_score: false,
facets: None,
};
let result = merge(input).unwrap();
let hit = &result.hits[0];
assert!(hit.get("_rankingScore").is_none());
assert!(hit.get("_miroir_shard").is_none());
assert!(hit.get("_miroir_node").is_none());
assert!(hit.get("_miroir_group").is_none());
// Non-reserved fields preserved.
assert_eq!(hit.get("id").unwrap(), "doc1");
assert_eq!(hit.get("title").unwrap(), "Test");
}
#[test]
fn test_merge_facets() {
let shard1 = ShardHitPage {
body: json!({
"hits": [],
"estimatedTotalHits": 100,
"processingTimeMs": 10,
"facetDistribution": {
"category": {
"electronics": 50,
"books": 30,
},
"brand": {
"apple": 20,
},
},
}),
};
let shard2 = ShardHitPage {
body: json!({
"hits": [],
"estimatedTotalHits": 100,
"processingTimeMs": 15,
"facetDistribution": {
"category": {
"electronics": 40,
"clothing": 25,
},
"brand": {
"samsung": 15,
},
},
}),
};
let input = MergeInput {
shard_hits: vec![shard1, shard2],
offset: 0,
limit: 10,
client_requested_score: false,
facets: None,
};
let result = merge(input).unwrap();
let facets = result.facet_distribution.unwrap();
// Check category merging.
let category = facets.get("category").unwrap();
assert_eq!(category.get("electronics"), Some(&90));
assert_eq!(category.get("books"), Some(&30));
assert_eq!(category.get("clothing"), Some(&25));
// Check brand merging.
let brand = facets.get("brand").unwrap();
assert_eq!(brand.get("apple"), Some(&20));
assert_eq!(brand.get("samsung"), Some(&15));
}
#[test]
fn test_merge_facets_filter() {
let shard = ShardHitPage {
body: json!({
"hits": [],
"estimatedTotalHits": 100,
"processingTimeMs": 10,
"facetDistribution": {
"category": {"electronics": 50},
"brand": {"apple": 20},
},
}),
};
let input = MergeInput {
shard_hits: vec![shard],
offset: 0,
limit: 10,
client_requested_score: false,
facets: Some(vec!["category".to_string()]),
};
let result = merge(input).unwrap();
let facets = result.facet_distribution.unwrap();
assert!(facets.contains_key("category"));
assert!(!facets.contains_key("brand"));
}
#[test]
fn test_merge_estimated_total_hits_sum() {
let input = MergeInput {
shard_hits: vec![
make_shard_response(vec![], 100, 10),
make_shard_response(vec![], 150, 15),
make_shard_response(vec![], 200, 20),
],
offset: 0,
limit: 10,
client_requested_score: false,
facets: None,
};
let result = merge(input).unwrap();
assert_eq!(result.estimated_total_hits, 450);
}
#[test]
fn test_merge_processing_time_max() {
let input = MergeInput {
shard_hits: vec![
make_shard_response(vec![], 100, 10),
make_shard_response(vec![], 100, 25),
make_shard_response(vec![], 100, 15),
],
offset: 0,
limit: 10,
client_requested_score: false,
facets: None,
};
let result = merge(input).unwrap();
assert_eq!(result.processing_time_ms, 25);
}
#[test]
fn test_merge_tie_breaking() {
let input = MergeInput {
shard_hits: vec![
make_shard_response(vec![make_hit("zebra", 0.5, 0)], 50, 10),
make_shard_response(vec![make_hit("apple", 0.5, 1)], 50, 10),
],
offset: 0,
limit: 10,
client_requested_score: false,
facets: None,
};
let result = merge(input).unwrap();
// RRF: both docs at rank 0 in their shards, equal RRF scores.
// Tie-break by primary key (apple < zebra lexicographically).
assert_eq!(result.hits[0].get("id").unwrap(), "apple");
assert_eq!(result.hits[1].get("id").unwrap(), "zebra");
}
#[test]
fn test_merge_degraded_flag() {
let failed_shard = ShardHitPage {
body: json!({
"success": false,
"message": "node unavailable",
}),
};
let input = MergeInput {
shard_hits: vec![
make_shard_response(vec![make_hit("doc1", 0.9, 0)], 50, 10),
failed_shard,
],
offset: 0,
limit: 10,
client_requested_score: false,
facets: None,
};
let result = merge(input).unwrap();
assert!(result.degraded);
}
#[test]
fn test_stable_serialization() {
let shard1 = ShardHitPage {
body: json!({
"hits": [make_hit("doc1", 0.9, 0)],
"estimatedTotalHits": 100,
"processingTimeMs": 10,
"facetDistribution": {
"category": {"electronics": 50, "books": 30},
},
}),
};
let shard2 = ShardHitPage {
body: json!({
"hits": [make_hit("doc2", 0.8, 1)],
"estimatedTotalHits": 100,
"processingTimeMs": 15,
"facetDistribution": {
"brand": {"apple": 20},
},
}),
};
let input = MergeInput {
shard_hits: vec![shard1.clone(), shard2.clone()],
offset: 0,
limit: 10,
client_requested_score: false,
facets: None,
};
let result1 = merge(input.clone()).unwrap();
let result2 = merge(input).unwrap();
// Serialize both to JSON.
let json1 = serde_json::to_value(&result1).unwrap();
let json2 = serde_json::to_value(&result2).unwrap();
// Byte-identical.
assert_eq!(
serde_json::to_vec(&json1).unwrap(),
serde_json::to_vec(&json2).unwrap()
);
}
#[test]
fn test_page_reconstruction() {
// Test that pages of 10 reconstruct a single limit=50 result.
let mut all_hits = Vec::new();
for i in 0..50 {
all_hits.push(make_hit(&format!("doc{:02}", i), (50 - i) as f64 / 100.0, 0));
}
let input = MergeInput {
shard_hits: vec![make_shard_response(all_hits, 50, 10)],
offset: 0,
limit: 50,
client_requested_score: false,
facets: None,
};
let full_result = merge(input.clone()).unwrap();
assert_eq!(full_result.hits.len(), 50);
// Now fetch in pages of 10 and verify they match.
for page in 0..5 {
let page_input = MergeInput {
shard_hits: input.shard_hits.clone(),
offset: page * 10,
limit: 10,
client_requested_score: false,
facets: None,
};
let page_result = merge(page_input).unwrap();
let start = page * 10;
let end = start + 10;
assert_eq!(
page_result.hits,
full_result.hits[start..end],
"Page {} mismatch",
page
);
}
}
}

View file

@ -99,7 +99,7 @@ pub fn count_assignment_diff(
#[cfg(test)]
mod tests {
use super::*;
use crate::topology::NodeId;
use crate::topology::{Node, NodeId};
use std::collections::HashMap;
/// Test 1: Determinism — same inputs always produce the same output.
@ -336,4 +336,228 @@ mod tests {
// Verify determinism
assert_eq!(score(0, node_a), score_0_a, "Score is non-deterministic");
}
// ── P1.3 acceptance tests: write_targets, query_group, covering_set ───
/// P1.3-A1: write_targets returns exactly RG × RF nodes (counting duplicates).
#[test]
fn test_write_targets_returns_rg_x_rf_nodes() {
let mut topo = Topology::new(64, 3, 2);
// Add 5 nodes to each of 3 groups
for i in 0u32..15 {
let rg = i / 5;
topo.add_node(Node::new(
NodeId::new(format!("node-{i}")),
format!("http://node-{i}:7700"),
rg,
));
}
let targets = write_targets(0, &topo);
// RG=3, RF=2 → 6 nodes total (may include duplicates)
assert_eq!(targets.len(), 6, "write_targets should return RG × RF nodes");
}
/// P1.3-A2: write_targets assigns one-per-group.
#[test]
fn test_write_targets_one_per_group() {
let mut topo = Topology::new(64, 2, 2);
// Group 0: nodes 0-2, Group 1: nodes 3-5
for i in 0u32..6 {
let rg = if i < 3 { 0 } else { 1 };
topo.add_node(Node::new(
NodeId::new(format!("node-{i}")),
format!("http://node-{i}:7700"),
rg,
));
}
let shard_id = 7;
let targets = write_targets(shard_id, &topo);
// Verify that the subset in group 0 matches assign_shard_in_group
let g0 = topo.group(0).unwrap();
let g0_targets: Vec<_> = targets
.iter()
.filter(|n| g0.nodes().contains(n))
.collect();
let g0_expected = assign_shard_in_group(shard_id, g0.nodes(), 2);
assert_eq!(
g0_targets.len(),
g0_expected.len(),
"Group 0 should have exactly RF nodes"
);
for node in &g0_expected {
assert!(g0_targets.contains(&node), "Group 0 missing expected node");
}
// Verify that the subset in group 1 matches assign_shard_in_group
let g1 = topo.group(1).unwrap();
let g1_targets: Vec<_> = targets
.iter()
.filter(|n| g1.nodes().contains(n))
.collect();
let g1_expected = assign_shard_in_group(shard_id, g1.nodes(), 2);
assert_eq!(
g1_targets.len(),
g1_expected.len(),
"Group 1 should have exactly RF nodes"
);
for node in &g1_expected {
assert!(g1_targets.contains(&node), "Group 1 missing expected node");
}
}
/// P1.3-A3: covering_set covers all shards within the chosen group.
#[test]
fn test_covering_set_covers_all_shards() {
let mut topo = Topology::new(16, 1, 2);
for i in 0u32..4 {
topo.add_node(Node::new(
NodeId::new(format!("node-{i}")),
format!("http://node-{i}:7700"),
0,
));
}
let group = topo.group(0).unwrap();
let shard_count = 16;
let covering = covering_set(shard_count, group, 2, 0);
// Verify that every shard is represented in the covering set
for shard_id in 0..shard_count {
let replicas = assign_shard_in_group(shard_id, group.nodes(), 2);
let selected = &replicas[0]; // query_seq=0 → first replica
assert!(
covering.contains(selected),
"Shard {}'s selected node {:?} not in covering set",
shard_id,
selected
);
}
}
/// P1.3-A4: covering_set size is bounded by Ng (nodes in group).
#[test]
fn test_covering_set_size_bound() {
let mut topo = Topology::new(1000, 1, 3);
for i in 0u32..5 {
topo.add_node(Node::new(
NodeId::new(format!("node-{i}")),
format!("http://node-{i}:7700"),
0,
));
}
let group = topo.group(0).unwrap();
let ng = group.node_count();
let covering = covering_set(1000, group, 3, 0);
assert!(
covering.len() <= ng,
"covering_set size {} exceeds group node count {}",
covering.len(),
ng
);
}
/// P1.3-A5: Two identical Topologies produce identical covering_set outputs.
#[test]
fn test_covering_set_determinism() {
let mut topo1 = Topology::new(64, 2, 2);
let mut topo2 = Topology::new(64, 2, 2);
for i in 0u32..6 {
let rg = if i < 3 { 0 } else { 1 };
let node = Node::new(
NodeId::new(format!("node-{i}")),
format!("http://node-{i}:7700"),
rg,
);
topo1.add_node(node.clone());
topo2.add_node(node);
}
let g1 = topo1.group(0).unwrap();
let g2 = topo2.group(0).unwrap();
for query_seq in 0..10 {
let c1 = covering_set(64, g1, 2, query_seq);
let c2 = covering_set(64, g2, 2, query_seq);
// Compare as sets since order may vary due to HashSet iteration
let s1: std::collections::HashSet<_> = c1.into_iter().collect();
let s2: std::collections::HashSet<_> = c2.into_iter().collect();
assert_eq!(
s1, s2,
"covering_set differs for identical topologies at query_seq={}",
query_seq
);
}
}
/// P1.3-A6: query_group distribution is uniform (chi-square test).
#[test]
fn test_query_group_uniform_distribution() {
let replica_groups = 5u32;
let samples = 10_000;
let mut counts = vec![0usize; replica_groups as usize];
for query_seq in 0..samples {
let g = query_group(query_seq as u64, replica_groups);
counts[g as usize] += 1;
}
// Expected count per group: samples / RG
let expected = samples as f64 / replica_groups as f64;
// Chi-square statistic: sum((observed - expected)^2 / expected)
let chi_square: f64 = counts
.iter()
.map(|&observed| {
let diff = observed as f64 - expected;
(diff * diff) / expected
})
.sum();
// Degrees of freedom = RG - 1 = 4
// Critical value at p=0.95 is ~9.49
let critical_value = 9.49;
assert!(
chi_square < critical_value,
"query_group distribution not uniform: chi-square={} > {}",
chi_square,
critical_value
);
}
/// P1.3-A7: covering_set rotates replicas by query_seq.
#[test]
fn test_covering_set_rotates_replicas() {
let mut topo = Topology::new(8, 1, 3);
for i in 0u32..4 {
topo.add_node(Node::new(
NodeId::new(format!("node-{i}")),
format!("http://node-{i}:7700"),
0,
));
}
let group = topo.group(0).unwrap();
let c0 = covering_set(8, group, 3, 0);
let c1 = covering_set(8, group, 3, 1);
let c2 = covering_set(8, group, 3, 2);
// For each shard, verify that the selected node rotates
for shard_id in 0..8 {
let replicas = assign_shard_in_group(shard_id, group.nodes(), 3);
let r0 = &replicas[0];
let r1 = &replicas[1];
let r2 = &replicas[2];
assert!(c0.contains(r0), "query_seq=0 should select first replica");
assert!(c1.contains(r1), "query_seq=1 should select second replica");
assert!(c2.contains(r2), "query_seq=2 should select third replica");
}
}
}

View file

@ -1,81 +1,548 @@
//! Scatter orchestration: fan-out logic and covering set builder.
use crate::config::UnavailableShardPolicy;
use crate::merger::ShardHitPage;
use crate::router::{covering_set, query_group};
use crate::topology::{NodeId, Topology};
use crate::Result;
use serde_json::Value;
use std::collections::HashMap;
/// Scatter orchestrator: fans out requests to the covering set.
pub trait Scatter: Send + Sync {
/// Execute a scatter request to multiple nodes.
/// Scatter plan: the exact shard→node mapping for a search query.
///
/// Separating the plan from execution makes §13.20 `/explain` cheap —
/// the explain path generates the plan and returns it without touching any node.
#[derive(Debug, Clone)]
pub struct ScatterPlan {
/// Chosen replica group for this query (query_seq % RG).
pub chosen_group: u32,
/// Target shards to query (for §13.4 narrowing — initially all 0..S).
pub target_shards: Vec<u32>,
/// Resolved covering set: shard ID → node ID.
pub shard_to_node: HashMap<u32, NodeId>,
/// Deadline for the query in milliseconds.
pub deadline_ms: u32,
/// Whether hedging is eligible (reserved for §13.2 Phase 5).
pub hedging_eligible: bool,
}
/// HTTP client for communicating with a Meilisearch node.
///
/// This is the seam between `miroir-core` (pure, no network) and
/// `miroir-proxy` (HTTP client). Injecting it via a trait means unit tests
/// can provide a fake client; production binds `reqwest` via the trait impl.
pub trait NodeClient: Send + Sync {
/// Execute a search request on a single node.
///
/// Returns a map of node ID to response. Failed nodes are omitted
/// based on the unavailable shard policy.
fn scatter(
/// Returns the raw JSON response from the node.
async fn search_node(
&self,
topology: &Topology,
nodes: Vec<NodeId>,
request: ScatterRequest,
policy: UnavailableShardPolicy,
) -> Result<ScatterResponse>;
node: &NodeId,
address: &str,
request: &SearchRequest,
) -> std::result::Result<Value, NodeError>;
}
/// A scatter request to be sent to each node.
/// Error from a single node during scatter.
#[derive(Debug, Clone)]
pub struct ScatterRequest {
/// Request body (JSON or raw bytes).
pub body: Vec<u8>,
/// Request headers.
pub headers: Vec<(String, String)>,
/// HTTP method.
pub method: String,
/// Request path.
pub path: String,
pub enum NodeError {
/// Node timed out.
Timeout,
/// Node returned an error response.
HttpError { status: u16, body: String },
/// Network or connection error.
NetworkError(String),
}
/// Response from a scatter operation.
/// A search request to be sent to each node in the covering set.
#[derive(Debug, Clone)]
pub struct ScatterResponse {
/// Responses from successful nodes.
pub responses: Vec<NodeResponse>,
pub struct SearchRequest {
/// Index UID being queried.
pub index_uid: String,
/// Nodes that failed or timed out.
pub failed: Vec<NodeId>,
/// Search query (q parameter).
pub query: Option<String>,
/// Offset for pagination.
pub offset: usize,
/// Limit for pagination.
pub limit: usize,
/// Filter expression.
pub filter: Option<Value>,
/// Facets to compute.
pub facets: Option<Vec<String>>,
/// Whether to return ranking scores.
pub ranking_score: bool,
/// Raw JSON body for the search request (captures any other parameters).
pub body: Value,
}
/// Response from a single node.
#[derive(Debug, Clone)]
pub struct NodeResponse {
/// Node that responded.
pub node_id: NodeId,
/// Result of a scatter operation.
#[derive(Debug)]
pub struct ScatterResult {
/// Responses from successfully contacted nodes.
pub shard_pages: Vec<ShardHitPage>,
/// Response body.
pub body: Vec<u8>,
/// Errors from nodes that failed (shard ID → error).
pub failed_shards: HashMap<u32, NodeError>,
/// HTTP status code.
pub status: u16,
/// Whether the response is partial (some shards failed).
pub partial: bool,
/// Response headers.
pub headers: Vec<(String, String)>,
/// Whether any node exceeded the deadline.
pub deadline_exceeded: bool,
}
/// Default stub implementation of Scatter.
#[derive(Debug, Clone, Default)]
pub struct StubScatter;
/// Construct a scatter plan for a search query.
///
/// This is a pure function — no async, no I/O. It selects the replica group,
/// computes the covering set, and maps each shard to its target node.
///
/// # Arguments
/// * `topology` - Current cluster topology
/// * `query_seq` - Query sequence number for group selection and load balancing
/// * `rf` - Replication factor (redundant with topology.rf, kept for explicitness)
/// * `shard_count` - Number of shards to query (typically topology.shards)
///
/// # Returns
/// A `ScatterPlan` containing the covering set and metadata for execution.
pub fn plan_search_scatter(
topology: &Topology,
query_seq: u64,
rf: usize,
shard_count: u32,
) -> ScatterPlan {
let chosen_group = query_group(query_seq, topology.replica_group_count());
impl Scatter for StubScatter {
fn scatter(
&self,
_topology: &Topology,
_nodes: Vec<NodeId>,
_request: ScatterRequest,
_policy: UnavailableShardPolicy,
) -> Result<ScatterResponse> {
Ok(ScatterResponse {
responses: Vec::new(),
failed: Vec::new(),
})
// Get the target group
let group = match topology.group(chosen_group) {
Some(g) => g,
None => {
// Invalid group ID — return empty plan (should not happen with valid topology)
return ScatterPlan {
chosen_group,
target_shards: Vec::new(),
shard_to_node: HashMap::new(),
deadline_ms: 0,
hedging_eligible: false,
};
}
};
// Compute covering set: one node per shard within the chosen group
let _covering = covering_set(shard_count, group, rf, query_seq);
// Build shard → node mapping
let mut shard_to_node = HashMap::new();
for shard_id in 0..shard_count {
let replicas = crate::router::assign_shard_in_group(shard_id, group.nodes(), rf);
// Rotate through replicas for intra-group load balancing
let selected = replicas[(query_seq as usize) % replicas.len()].clone();
shard_to_node.insert(shard_id, selected);
}
// Initially target all shards
let target_shards: Vec<u32> = (0..shard_count).collect();
// Default deadline: 5 seconds (configurable in production)
let deadline_ms = 5000;
// Hedging is eligible when we have multiple nodes in the group (reserved for §13.2)
let hedging_eligible = group.node_count() > 1;
ScatterPlan {
chosen_group,
target_shards,
shard_to_node,
deadline_ms,
hedging_eligible,
}
}
/// Execute a scatter operation against the covering set.
///
/// Fans out the search request to all nodes in the plan, handling partial
/// failures according to the unavailable shard policy.
///
/// # Arguments
/// * `plan` - Scatter plan from `plan_search_scatter`
/// * `client` - HTTP client for communicating with nodes
/// * `req` - Search request to execute
/// * `topology` - Current topology (for resolving node addresses)
/// * `policy` - Policy for handling unavailable shards
///
/// # Returns
/// A `ScatterResult` containing successful responses and any errors.
pub async fn execute_scatter<C: NodeClient>(
plan: ScatterPlan,
client: &C,
req: SearchRequest,
topology: &Topology,
policy: UnavailableShardPolicy,
) -> Result<ScatterResult> {
use std::collections::HashMap;
// Group requests by unique node (scatter happens once per node, not per shard)
let mut node_to_shards: HashMap<NodeId, Vec<u32>> = HashMap::new();
for (&shard_id, node_id) in &plan.shard_to_node {
if plan.target_shards.contains(&shard_id) {
node_to_shards
.entry(node_id.clone())
.or_default()
.push(shard_id);
}
}
let mut shard_pages = Vec::new();
let mut failed_shards = HashMap::new();
let mut deadline_exceeded = false;
// Execute requests in parallel (one per unique node)
let mut tasks = Vec::new();
for (node_id, shards) in node_to_shards {
let node = match topology.node(&node_id) {
Some(n) => n.clone(),
None => {
// Node not found in topology — mark all its shards as failed
for shard_id in shards {
failed_shards.insert(
shard_id,
NodeError::NetworkError("node not in topology".to_string()),
);
}
continue;
}
};
let client_ref = client;
let req_clone = req.clone();
let node_id_clone = node_id.clone();
tasks.push(async move {
let result = client_ref
.search_node(&node_id_clone, &node.address, &req_clone)
.await;
(node_id_clone, shards, result)
});
}
// Await all tasks
let results = futures_util::future::join_all(tasks).await;
for (_node_id, shards, result) in results {
match result {
Ok(body) => {
// Create a ShardHitPage for each shard served by this node
for _shard_id in shards {
shard_pages.push(ShardHitPage { body: body.clone() });
}
}
Err(NodeError::Timeout) => {
deadline_exceeded = true;
for shard_id in shards {
failed_shards.insert(shard_id, NodeError::Timeout);
}
}
Err(e) => {
for shard_id in shards {
failed_shards.insert(shard_id, e.clone());
}
}
}
}
// Determine if response is partial
let partial = !failed_shards.is_empty();
// Apply unavailable shard policy
match policy {
UnavailableShardPolicy::Error => {
if !failed_shards.is_empty() {
return Err(crate::error::MiroirError::Routing(format!(
"{} shard(s) unavailable",
failed_shards.len()
)));
}
}
UnavailableShardPolicy::Partial => {
// Return partial results (already done)
}
UnavailableShardPolicy::Fallback => {
// Reserved for §13.2 Phase 5: query other replica groups for failed shards
// For now, treat as Partial
}
}
Ok(ScatterResult {
shard_pages,
failed_shards,
partial,
deadline_exceeded,
})
}
/// Stubs for testing (no actual network calls).
/// Mock `NodeClient` for testing.
#[derive(Debug, Clone, Default)]
pub struct MockNodeClient {
/// Optional pre-programmed responses per node ID.
pub responses: HashMap<NodeId, Value>,
/// Optional pre-programmed errors per node ID.
pub errors: HashMap<NodeId, NodeError>,
/// Optional delay for simulating slow nodes.
pub delay_ms: u64,
}
impl NodeClient for MockNodeClient {
async fn search_node(
&self,
node: &NodeId,
_address: &str,
_request: &SearchRequest,
) -> std::result::Result<Value, NodeError> {
// Simulate network delay if configured
// Note: actual sleep requires tokio runtime; this is a no-op placeholder
let _ = self.delay_ms;
// Check for pre-programmed error
if let Some(err) = self.errors.get(node) {
return Err(err.clone());
}
// Return pre-programmed response or default empty response
Ok(self.responses.get(node).cloned().unwrap_or_else(|| {
serde_json::json!({
"hits": [],
"estimatedTotalHits": 0,
"processingTimeMs": 0,
"facetDistribution": {},
})
}))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::topology::{Node, NodeId};
fn make_test_topology() -> Topology {
let mut topo = Topology::new(64, 2, 2);
for i in 0u32..6 {
let rg = if i < 3 { 0 } else { 1 };
let mut node = Node::new(
NodeId::new(format!("node-{i}")),
format!("http://node-{i}:7700"),
rg,
);
node.status = crate::topology::NodeStatus::Active;
topo.add_node(node);
}
topo
}
#[test]
fn test_plan_search_scatter_pure_function() {
let topo = make_test_topology();
let plan = plan_search_scatter(&topo, 0, 2, 64);
assert_eq!(plan.chosen_group, 0);
assert_eq!(plan.target_shards.len(), 64);
assert_eq!(plan.shard_to_node.len(), 64);
assert_eq!(plan.deadline_ms, 5000);
assert!(plan.hedging_eligible);
}
#[test]
fn test_plan_search_scatter_query_group_rotation() {
let topo = make_test_topology();
// query_seq 0 → group 0
let plan0 = plan_search_scatter(&topo, 0, 2, 64);
assert_eq!(plan0.chosen_group, 0);
// query_seq 1 → group 1
let plan1 = plan_search_scatter(&topo, 1, 2, 64);
assert_eq!(plan1.chosen_group, 1);
// query_seq 2 → group 0
let plan2 = plan_search_scatter(&topo, 2, 2, 64);
assert_eq!(plan2.chosen_group, 0);
}
#[test]
fn test_plan_search_scatter_shard_to_node_mapping() {
let topo = make_test_topology();
let plan = plan_search_scatter(&topo, 0, 2, 64);
// All shards should be mapped to a node
for shard_id in 0..64 {
assert!(
plan.shard_to_node.contains_key(&shard_id),
"Shard {} not in mapping",
shard_id
);
}
// All nodes should be from group 0
let g0 = topo.group(0).unwrap();
for (_shard_id, node_id) in &plan.shard_to_node {
assert!(
g0.nodes().contains(node_id),
"Node {:?} not in group 0",
node_id
);
}
}
#[test]
fn test_plan_search_scatter_hedging_eligibility() {
let mut topo = Topology::new(64, 1, 1);
// Single node group
topo.add_node(Node::new(
NodeId::new("node-0".to_string()),
"http://node-0:7700".to_string(),
0,
));
let plan = plan_search_scatter(&topo, 0, 1, 64);
assert!(!plan.hedging_eligible);
// Multi-node group
let topo = make_test_topology();
let plan = plan_search_scatter(&topo, 0, 2, 64);
assert!(plan.hedging_eligible);
}
#[tokio::test]
async fn test_execute_scatter_with_mock_client() {
let topo = make_test_topology();
let plan = plan_search_scatter(&topo, 0, 2, 64);
let mut client = MockNodeClient::default();
client.responses.insert(
NodeId::new("node-0".to_string()),
serde_json::json!({
"hits": [{"id": "doc1", "title": "Test"}],
"estimatedTotalHits": 1,
"processingTimeMs": 5,
}),
);
let req = SearchRequest {
index_uid: "test".to_string(),
query: Some("test".to_string()),
offset: 0,
limit: 10,
filter: None,
facets: None,
ranking_score: false,
body: serde_json::json!({}),
};
let result = execute_scatter(plan, &client, req, &topo, UnavailableShardPolicy::Partial)
.await
.unwrap();
assert!(!result.partial);
assert!(!result.deadline_exceeded);
assert_eq!(result.shard_pages.len(), 64); // One page per shard
assert!(result.failed_shards.is_empty());
}
#[tokio::test]
async fn test_execute_scatter_partial_failure() {
let topo = make_test_topology();
let plan = plan_search_scatter(&topo, 0, 2, 64);
let mut client = MockNodeClient::default();
// Make node-0 fail
client.errors.insert(
NodeId::new("node-0".to_string()),
NodeError::Timeout,
);
client.responses.insert(
NodeId::new("node-1".to_string()),
serde_json::json!({
"hits": [],
"estimatedTotalHits": 0,
"processingTimeMs": 0,
}),
);
let req = SearchRequest {
index_uid: "test".to_string(),
query: Some("test".to_string()),
offset: 0,
limit: 10,
filter: None,
facets: None,
ranking_score: false,
body: serde_json::json!({}),
};
let result = execute_scatter(plan, &client, req, &topo, UnavailableShardPolicy::Partial)
.await
.unwrap();
assert!(result.partial);
assert!(!result.failed_shards.is_empty());
// Some shards should still succeed (those on node-1 and node-2)
assert!(!result.shard_pages.is_empty());
}
#[tokio::test]
async fn test_execute_scatter_error_policy() {
let topo = make_test_topology();
let plan = plan_search_scatter(&topo, 0, 2, 64);
let mut client = MockNodeClient::default();
client.errors.insert(
NodeId::new("node-0".to_string()),
NodeError::Timeout,
);
let req = SearchRequest {
index_uid: "test".to_string(),
query: Some("test".to_string()),
offset: 0,
limit: 10,
filter: None,
facets: None,
ranking_score: false,
body: serde_json::json!({}),
};
let result = execute_scatter(plan, &client, req, &topo, UnavailableShardPolicy::Error).await;
assert!(result.is_err());
}
#[test]
fn test_node_error_variants() {
let timeout = NodeError::Timeout;
assert!(matches!(timeout, NodeError::Timeout));
let http_err = NodeError::HttpError {
status: 500,
body: "Internal Server Error".to_string(),
};
assert!(matches!(http_err, NodeError::HttpError { .. }));
let net_err = NodeError::NetworkError("connection refused".to_string());
assert!(matches!(net_err, NodeError::NetworkError(_)));
}
}

View file

@ -0,0 +1,11 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc c28f459e669c9568d88ad15c1e23b7900c447c465fb405ac76ca127d5a5d7d69 # shrinks to shard_count = 23, node_count = 5, rf = 2
cc 5e225fd981de25480c87d5a319e25c7e85e68ec4e7d2674e4d09b38939c916fc # shrinks to shard_count = 17, node_count = 8, rf = 1
cc 2f0a493a306793420b97ed4c9c15f507f69e558169e0e47035ecfffd2de0d40c # shrinks to shard_count = 10, node_count = 6, rf = 1
cc 819e50f063ba8bc32df8ed79259c439512ee816a34e0cd02a56e312c46447812 # shrinks to shard_count = 20, node_count = 7, rf = 1
cc 8afd2c631dd0aae67601ab56fd1decde56c5c5048f5a79394913796e96f6a29f # shrinks to shard_count = 30, node_count = 4, rf = 1

View file

@ -245,6 +245,80 @@ def simulate_distributed_search(
}
RRF_K = 60 # RRF constant, matching merger.rs
def simulate_distributed_search_rrf(
shards: Dict[int, List[Dict]],
shard_stats: Dict[int, Tuple[Dict, int, float]],
query: Dict,
limit: int = 100,
) -> Dict:
"""
Simulate distributed search using Reciprocal Rank Fusion.
RRF score for a document: sum over shards of 1/(k + rank + 1)
where rank is 0-based position in shard's result list.
This avoids the score comparability issue entirely because
RRF only uses rank position, not raw scores.
"""
query_terms = tokenize(query["q"])
per_shard_limit = limit * 2
# Accumulate RRF scores per document
rrf_scores: Dict[str, float] = defaultdict(float)
doc_info: Dict[str, Tuple[Dict, int]] = {} # id -> (doc, shard_id)
for shard_id, docs in shards.items():
df, N, avgdl = shard_stats[shard_id]
if query.get("filter"):
category_filter = query["filter"].split("=")[1].strip()
filtered_docs = [d for d in docs if d["category"] == category_filter]
else:
filtered_docs = docs
scores = []
for doc in filtered_docs:
score = score_document_bm25(doc, query_terms, df, N, avgdl)
if score > 0:
scores.append((doc, score))
scores.sort(key=lambda x: x[1], reverse=True)
for rank, (doc, _score) in enumerate(scores[:per_shard_limit]):
doc_id = doc["id"]
rrf_contribution = 1.0 / (RRF_K + rank + 1)
rrf_scores[doc_id] += rrf_contribution
if doc_id not in doc_info:
doc_info[doc_id] = (doc, shard_id)
# Sort by RRF score descending
sorted_docs = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
hits = []
for doc_id, rrf_score in sorted_docs[:limit]:
doc, shard_id = doc_info[doc_id]
hits.append({
"id": doc_id,
"title": doc["title"],
"score": rrf_score,
"shard": shard_id,
})
return {
"query_id": query["id"],
"type": query.get("type", "unknown"),
"q": query["q"],
"filter": query.get("filter"),
"hits": hits,
"total_hits": len(sorted_docs),
"shards_queried": list(shards.keys()),
"merge_strategy": "rrf",
}
def run_experiment(
corpus_dir: Path,
query_file: Path,
@ -293,10 +367,13 @@ def run_experiment(
ground_truth_file = output_dir / "ground-truth.jsonl"
distributed_file = output_dir / "distributed.jsonl"
rrf_file = output_dir / "distributed-rrf.jsonl"
print(f"\nRunning experiments...")
with open(ground_truth_file, "w") as gt_f, open(distributed_file, "w") as dist_f:
with open(ground_truth_file, "w") as gt_f, \
open(distributed_file, "w") as dist_f, \
open(rrf_file, "w") as rrf_f:
for i, query in enumerate(queries):
if (i + 1) % 1000 == 0:
print(f" Processed {i + 1} queries...")
@ -305,16 +382,23 @@ def run_experiment(
gt_result = simulate_search(docs, query, global_stats, limit)
gt_f.write(json.dumps(gt_result) + "\n")
# Distributed: each shard uses local statistics
# Distributed: each shard uses local statistics (score-based merge)
dist_result = simulate_distributed_search(
shards, shard_stats, query, limit
)
dist_f.write(json.dumps(dist_result) + "\n")
# RRF: rank-based merge (no score comparability needed)
rrf_result = simulate_distributed_search_rrf(
shards, shard_stats, query, limit
)
rrf_f.write(json.dumps(rrf_result) + "\n")
print(f" Completed {len(queries)} queries")
print(f"\nResults saved to:")
print(f" {ground_truth_file}")
print(f" {distributed_file}")
print(f" {rrf_file}")
# Save experiment metadata
exp_meta = {
@ -323,6 +407,8 @@ def run_experiment(
"shard_count": shard_count,
"limit": limit,
"total_queries": len(queries),
"merge_strategies": ["score", "rrf"],
"rrf_k": RRF_K,
"global_stats": {"N": global_stats[1], "avgdl": global_stats[2]},
"shard_stats": {
str(k): {"N": v[1], "avgdl": v[2]}
@ -381,6 +467,7 @@ def main():
print("\nTo compare results, run:")
print(f" python3 {output_dir}/compare.py {output_dir}/ground-truth.jsonl {output_dir}/distributed.jsonl --verbose")
print(f" python3 {output_dir}/compare.py {output_dir}/ground-truth.jsonl {output_dir}/distributed-rrf.jsonl --verbose")
if __name__ == "__main__":