Phase 2 cleanup: Remove superseded handler files
Remove index_handler.rs, search_handler.rs, and write.rs which were superseded by the new routes/ directory structure during Phase 2 implementation. The new routes/ module provides better organization: - routes/indexes.rs (index lifecycle) - routes/search.rs (search endpoint) - routes/documents.rs (document CRUD) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
cfacb5160e
commit
0e91b6286d
10 changed files with 4854 additions and 6410 deletions
File diff suppressed because one or more lines are too long
|
|
@ -5,11 +5,11 @@
|
|||
"model": "glm-4.7",
|
||||
"exit_code": 1,
|
||||
"outcome": "failure",
|
||||
"duration_ms": 202017,
|
||||
"duration_ms": 163133,
|
||||
"input_tokens": null,
|
||||
"output_tokens": null,
|
||||
"cost_usd": null,
|
||||
"captured_at": "2026-05-09T19:24:12.569700797Z",
|
||||
"captured_at": "2026-05-09T19:28:42.687338836Z",
|
||||
"trace_format": "claude_json",
|
||||
"pruned": false,
|
||||
"template_version": null
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
|
|
@ -3,13 +3,13 @@
|
|||
"agent": "claude-code-glm-4.7",
|
||||
"provider": "zai",
|
||||
"model": "glm-4.7",
|
||||
"exit_code": 0,
|
||||
"outcome": "success",
|
||||
"duration_ms": 373870,
|
||||
"exit_code": 1,
|
||||
"outcome": "failure",
|
||||
"duration_ms": 240718,
|
||||
"input_tokens": null,
|
||||
"output_tokens": null,
|
||||
"cost_usd": null,
|
||||
"captured_at": "2026-05-09T19:25:59.028910100Z",
|
||||
"captured_at": "2026-05-09T19:28:13.519467484Z",
|
||||
"trace_format": "claude_json",
|
||||
"pruned": false,
|
||||
"template_version": null
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
|
|
@ -1 +1 @@
|
|||
889f80ebb506d5576a338d3d3808417fca7b58d9
|
||||
cfacb5160e7acd5c4e95f3a725544f862a163e6c
|
||||
|
|
|
|||
|
|
@ -1,287 +0,0 @@
|
|||
//! Index lifecycle operations: create, delete, stats.
|
||||
|
||||
use crate::state::ProxyState;
|
||||
use miroir_core::topology::Topology;
|
||||
use miroir_core::{MiroirError, Result};
|
||||
use serde_json::{json, Value};
|
||||
use std::collections::HashMap;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Index lifecycle executor.
|
||||
pub struct IndexExecutor {
|
||||
state: ProxyState,
|
||||
}
|
||||
|
||||
impl IndexExecutor {
|
||||
pub fn new(state: ProxyState) -> Self {
|
||||
Self { state }
|
||||
}
|
||||
|
||||
/// Create an index on all nodes.
|
||||
pub async fn create_index(&self, uid: &str, primary_key: Option<&str>) -> Result<IndexResult> {
|
||||
let topology = self.state.topology().await;
|
||||
|
||||
// Prepare request body
|
||||
let mut body = json!({
|
||||
"uid": uid,
|
||||
});
|
||||
|
||||
if let Some(pk) = primary_key {
|
||||
body["primaryKey"] = json!(pk);
|
||||
}
|
||||
|
||||
let body_bytes = serde_json::to_vec(&body).unwrap();
|
||||
|
||||
// Broadcast to all nodes
|
||||
let mut node_tasks = HashMap::new();
|
||||
let mut failed_nodes = Vec::new();
|
||||
|
||||
for node in topology.nodes() {
|
||||
match self
|
||||
.state
|
||||
.client
|
||||
.send_to_node(
|
||||
&topology,
|
||||
&node.id,
|
||||
"POST",
|
||||
"/indexes",
|
||||
Some(&body_bytes),
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(resp) if (200..300).contains(&resp.status) => {
|
||||
if let Some(task_uid) = resp.body.get("taskUid").and_then(|v| v.as_u64()) {
|
||||
node_tasks.insert(node.id.as_str().to_string(), task_uid);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
failed_nodes.push(node.id.as_str().to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !failed_nodes.is_empty() {
|
||||
// Rollback: delete from successful nodes
|
||||
for node_id in node_tasks.keys() {
|
||||
let _ = self
|
||||
.state
|
||||
.client
|
||||
.send_to_node(
|
||||
&topology,
|
||||
&node_id.clone().into(),
|
||||
"DELETE",
|
||||
&format!("/indexes/{}", uid),
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
return Err(MiroirError::Routing(format!(
|
||||
"Failed to create index on nodes: {:?}",
|
||||
failed_nodes
|
||||
)));
|
||||
}
|
||||
|
||||
// Add _miroir_shard to filterable attributes
|
||||
self.add_miroir_shard_filterable(uid).await?;
|
||||
|
||||
let miroir_task_id = format!("mtask-{}", Uuid::new_v4());
|
||||
|
||||
Ok(IndexResult {
|
||||
miroir_task_id,
|
||||
node_tasks,
|
||||
})
|
||||
}
|
||||
|
||||
/// Delete an index from all nodes.
|
||||
pub async fn delete_index(&self, uid: &str) -> Result<IndexResult> {
|
||||
let topology = self.state.topology().await;
|
||||
|
||||
let mut node_tasks = HashMap::new();
|
||||
let mut failed_nodes = Vec::new();
|
||||
|
||||
for node in topology.nodes() {
|
||||
match self
|
||||
.state
|
||||
.client
|
||||
.send_to_node(
|
||||
&topology,
|
||||
&node.id,
|
||||
"DELETE",
|
||||
&format!("/indexes/{}", uid),
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(resp) if (200..300).contains(&resp.status) => {
|
||||
if let Some(task_uid) = resp.body.get("taskUid").and_then(|v| v.as_u64()) {
|
||||
node_tasks.insert(node.id.as_str().to_string(), task_uid);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
failed_nodes.push(node.id.as_str().to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !failed_nodes.is_empty() {
|
||||
return Err(MiroirError::Routing(format!(
|
||||
"Failed to delete index on nodes: {:?}",
|
||||
failed_nodes
|
||||
)));
|
||||
}
|
||||
|
||||
let miroir_task_id = format!("mtask-{}", Uuid::new_v4());
|
||||
|
||||
Ok(IndexResult {
|
||||
miroir_task_id,
|
||||
node_tasks,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get aggregated stats for an index.
|
||||
pub async fn get_stats(&self, uid: &str) -> Result<Value> {
|
||||
let topology = self.state.topology().await;
|
||||
|
||||
let mut total_documents = 0u64;
|
||||
let mut field_distribution: HashMap<String, u64> = HashMap::new();
|
||||
let mut failed_nodes = Vec::new();
|
||||
|
||||
for node in topology.nodes() {
|
||||
match self
|
||||
.state
|
||||
.client
|
||||
.send_to_node(
|
||||
&topology,
|
||||
&node.id,
|
||||
"GET",
|
||||
&format!("/indexes/{}/stats", uid),
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(resp) if (200..300).contains(&resp.status) => {
|
||||
// Sum numberOfDocuments
|
||||
if let Some(count) = resp.body.get("numberOfDocuments").and_then(|v| v.as_u64()) {
|
||||
total_documents += count;
|
||||
}
|
||||
|
||||
// Merge fieldDistribution
|
||||
if let Some(fields) = resp.body.get("fieldDistribution").and_then(|v| v.as_object()) {
|
||||
for (field, count) in fields {
|
||||
let count_val = count.as_u64().unwrap_or(0);
|
||||
*field_distribution.entry(field.clone()).or_insert(0) += count_val;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
failed_nodes.push(node.id.as_str().to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if failed_nodes.len() > topology.nodes().count() / 2 {
|
||||
return Err(MiroirError::Routing(format!(
|
||||
"Failed to get stats from majority of nodes: {:?}",
|
||||
failed_nodes
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(json!({
|
||||
"numberOfDocuments": total_documents,
|
||||
"fieldDistribution": field_distribution,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Add _miroir_shard to filterable attributes.
|
||||
async fn add_miroir_shard_filterable(&self, uid: &str) -> Result<()> {
|
||||
let topology = self.state.topology().await;
|
||||
|
||||
// Get current settings
|
||||
let first_node = topology.nodes().next();
|
||||
if let Some(node) = first_node {
|
||||
if let Ok(resp) = self
|
||||
.state
|
||||
.client
|
||||
.send_to_node(
|
||||
&topology,
|
||||
&node.id,
|
||||
"GET",
|
||||
&format!("/indexes/{}/settings/filterable-attributes", uid),
|
||||
None,
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
{
|
||||
if let Some(attrs) = resp.body.as_array() {
|
||||
let mut attrs_vec: Vec<String> = attrs
|
||||
.iter()
|
||||
.filter_map(|v| v.as_str().map(|s| s.to_string()))
|
||||
.collect();
|
||||
|
||||
if !attrs_vec.contains(&"_miroir_shard".to_string()) {
|
||||
attrs_vec.push("_miroir_shard".to_string());
|
||||
|
||||
let body = serde_json::to_vec(&attrs_vec).unwrap();
|
||||
|
||||
// Broadcast to all nodes
|
||||
for node in topology.nodes() {
|
||||
let _ = self
|
||||
.state
|
||||
.client
|
||||
.send_to_node(
|
||||
&topology,
|
||||
&node.id,
|
||||
"PUT",
|
||||
&format!("/indexes/{}/settings/filterable-attributes", uid),
|
||||
Some(&body),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Result of an index operation.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IndexResult {
|
||||
pub miroir_task_id: String,
|
||||
pub node_tasks: HashMap<String, u64>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use miroir_core::config::MiroirConfig;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_index_result_creation() {
|
||||
let result = IndexResult {
|
||||
miroir_task_id: "mtask-123".to_string(),
|
||||
node_tasks: HashMap::new(),
|
||||
};
|
||||
|
||||
assert_eq!(result.miroir_task_id, "mtask-123");
|
||||
}
|
||||
|
||||
fn create_test_executor() -> IndexExecutor {
|
||||
let config = MiroirConfig {
|
||||
shards: 64,
|
||||
replication_factor: 2,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let state = ProxyState::new(config).unwrap();
|
||||
IndexExecutor::new(state)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,180 +0,0 @@
|
|||
//! Search read path: scatter-gather with result merging.
|
||||
|
||||
use crate::scatter::HttpScatter;
|
||||
use crate::state::ProxyState;
|
||||
use miroir_core::config::UnavailableShardPolicy;
|
||||
use miroir_core::merger::{Merger, MergerImpl, ShardResponse};
|
||||
use miroir_core::router;
|
||||
use miroir_core::scatter::ScatterRequest;
|
||||
use miroir_core::topology::{NodeId, Topology};
|
||||
use miroir_core::{MiroirError, Result};
|
||||
use serde_json::{json, Value};
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// Search executor for scatter-gather queries.
|
||||
pub struct SearchExecutor {
|
||||
state: ProxyState,
|
||||
scatter: HttpScatter,
|
||||
merger: MergerImpl,
|
||||
}
|
||||
|
||||
impl SearchExecutor {
|
||||
pub fn new(state: ProxyState) -> Self {
|
||||
let node_timeout_ms = state.config.scatter.node_timeout_ms;
|
||||
let scatter = HttpScatter::new(state.client.clone(), node_timeout_ms);
|
||||
|
||||
Self {
|
||||
state,
|
||||
scatter,
|
||||
merger: MergerImpl,
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute a search query across the covering set.
|
||||
pub async fn search(
|
||||
&self,
|
||||
index: &str,
|
||||
query: Value,
|
||||
offset: usize,
|
||||
limit: usize,
|
||||
) -> Result<SearchResult> {
|
||||
let topology = self.state.topology().await;
|
||||
let shard_count = self.state.config.shards;
|
||||
let rf = self.state.config.replication_factor as usize;
|
||||
let replica_groups = topology.replica_group_count();
|
||||
|
||||
// Select query group
|
||||
let query_seq = self.state.next_query_seq();
|
||||
let group_id = router::query_group(query_seq, replica_groups);
|
||||
|
||||
let group = topology
|
||||
.group(group_id)
|
||||
.ok_or_else(|| MiroirError::Routing(format!("Group {} not found", group_id)))?;
|
||||
|
||||
// Build covering set
|
||||
let covering = router::covering_set(shard_count, group, rf, query_seq);
|
||||
|
||||
// Deduplicate nodes
|
||||
let unique_nodes: std::collections::HashSet<_> = covering.into_iter().collect();
|
||||
|
||||
// Prepare search query
|
||||
let mut query_with_score = query.clone();
|
||||
if let Some(obj) = query_with_score.as_object_mut() {
|
||||
obj.insert("showRankingScore".to_string(), json!(true));
|
||||
}
|
||||
|
||||
let body = serde_json::to_vec(&query_with_score).unwrap();
|
||||
let path = format!("/indexes/{}/search", index);
|
||||
|
||||
let request = ScatterRequest {
|
||||
body,
|
||||
headers: vec![],
|
||||
method: "POST".to_string(),
|
||||
path,
|
||||
};
|
||||
|
||||
// Get policy from config
|
||||
let policy = match self.state.config.scatter.unavailable_shard_policy.as_str() {
|
||||
"error" => UnavailableShardPolicy::Error,
|
||||
"fallback" => UnavailableShardPolicy::Fallback,
|
||||
_ => UnavailableShardPolicy::Partial,
|
||||
};
|
||||
|
||||
// Scatter to covering set
|
||||
let response = self
|
||||
.scatter
|
||||
.scatter(&topology, unique_nodes.into_iter().collect(), request, policy)
|
||||
.await?;
|
||||
|
||||
// Convert node responses to shard responses
|
||||
let mut shard_responses: Vec<ShardResponse> = Vec::new();
|
||||
let mut degraded_shards = Vec::new();
|
||||
|
||||
for node_resp in response.responses {
|
||||
// Parse response as shard response (all shards from this node)
|
||||
shard_responses.push(ShardResponse {
|
||||
shard_id: 0, // We'll merge all responses together
|
||||
body: serde_json::from_slice(&node_resp.body).unwrap_or(json!({})),
|
||||
success: true,
|
||||
});
|
||||
}
|
||||
|
||||
for failed_node in &response.failed {
|
||||
degraded_shards.push(failed_node.as_str().to_string());
|
||||
}
|
||||
|
||||
// Check if client requested ranking score
|
||||
let client_requested_score = query
|
||||
.get("showRankingScore")
|
||||
.and_then(|v| v.as_bool())
|
||||
.unwrap_or(false);
|
||||
|
||||
// Merge results
|
||||
let merged = self
|
||||
.merger
|
||||
.merge(shard_responses, offset, limit, client_requested_score)?;
|
||||
|
||||
// Build response
|
||||
let mut result = json!({
|
||||
"hits": merged.hits,
|
||||
"processingTimeMs": merged.processing_time_ms,
|
||||
"query": query,
|
||||
});
|
||||
|
||||
if !merged.facets.is_null() {
|
||||
if let Some(obj) = result.as_object_mut() {
|
||||
obj.insert("facetDistribution".to_string(), merged.facets);
|
||||
}
|
||||
}
|
||||
|
||||
// Add estimatedTotalHits if present
|
||||
if merged.total_hits > 0 {
|
||||
if let Some(obj) = result.as_object_mut() {
|
||||
obj.insert("estimatedTotalHits".to_string(), json!(merged.total_hits));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(SearchResult {
|
||||
body: result,
|
||||
degraded: merged.degraded,
|
||||
degraded_shards,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Result of a search operation.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SearchResult {
|
||||
pub body: Value,
|
||||
pub degraded: bool,
|
||||
pub degraded_shards: Vec<String>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use miroir_core::config::MiroirConfig;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_search_result_creation() {
|
||||
let result = SearchResult {
|
||||
body: json!({"hits": []}),
|
||||
degraded: false,
|
||||
degraded_shards: vec![],
|
||||
};
|
||||
|
||||
assert_eq!(result.body["hits"].as_array().unwrap().len(), 0);
|
||||
assert!(!result.degraded);
|
||||
}
|
||||
|
||||
fn create_test_executor() -> SearchExecutor {
|
||||
let config = MiroirConfig {
|
||||
shards: 64,
|
||||
replication_factor: 2,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let state = ProxyState::new(config).unwrap();
|
||||
SearchExecutor::new(state)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,295 +0,0 @@
|
|||
//! Write path: document routing with hash-based sharding and quorum.
|
||||
|
||||
use crate::client::NodeClient;
|
||||
use crate::error_response::ErrorResponse;
|
||||
use crate::scatter::HttpScatter;
|
||||
use crate::state::ProxyState;
|
||||
use miroir_core::config::UnavailableShardPolicy;
|
||||
use miroir_core::router;
|
||||
use miroir_core::scatter::ScatterRequest;
|
||||
use miroir_core::topology::Topology;
|
||||
use miroir_core::{MiroirError, Result};
|
||||
use serde_json::{json, Map, Value};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::atomic::Ordering;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Write path executor for document batches.
|
||||
pub struct WriteExecutor {
|
||||
state: ProxyState,
|
||||
scatter: HttpScatter,
|
||||
}
|
||||
|
||||
impl WriteExecutor {
|
||||
pub fn new(state: ProxyState) -> Self {
|
||||
let node_timeout_ms = state.config.scatter.node_timeout_ms;
|
||||
let scatter = HttpScatter::new(state.client.clone(), node_timeout_ms);
|
||||
|
||||
Self { state, scatter }
|
||||
}
|
||||
|
||||
/// Execute a document write (add/replace) for an index.
|
||||
pub async fn write_documents(
|
||||
&self,
|
||||
index: &str,
|
||||
documents: Vec<Value>,
|
||||
primary_key: Option<&str>,
|
||||
) -> Result<WriteResult> {
|
||||
// Validate primary key is known
|
||||
let pk = self.resolve_primary_key(index, primary_key).await?;
|
||||
|
||||
// Hash documents by shard and group by target nodes
|
||||
let topology = self.state.topology().await;
|
||||
let shard_count = self.state.config.shards;
|
||||
let rf = self.state.config.replication_factor as usize;
|
||||
|
||||
let mut shard_groups: HashMap<u32, Vec<Value>> = HashMap::new();
|
||||
let mut reserved_field_errors = Vec::new();
|
||||
|
||||
for (idx, doc) in documents.iter().enumerate() {
|
||||
// Check for reserved fields
|
||||
if let Some(obj) = doc.as_object() {
|
||||
if obj.contains_key("_miroir_shard") {
|
||||
reserved_field_errors.push(idx);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Extract primary key value
|
||||
let pk_value = self.extract_pk_value(doc, &pk)?;
|
||||
let shard_id = router::shard_for_key(&pk_value, shard_count);
|
||||
|
||||
// Inject _miroir_shard
|
||||
let mut doc_with_shard = doc.clone();
|
||||
if let Some(obj) = doc_with_shard.as_object_mut() {
|
||||
obj.insert("_miroir_shard".to_string(), json!(shard_id));
|
||||
}
|
||||
|
||||
shard_groups
|
||||
.entry(shard_id)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(doc_with_shard);
|
||||
}
|
||||
|
||||
if !reserved_field_errors.is_empty() {
|
||||
return Err(MiroirError::Routing(format!(
|
||||
"{} documents contain reserved field _miroir_shard: {:?}",
|
||||
reserved_field_errors.len(),
|
||||
reserved_field_errors
|
||||
)));
|
||||
}
|
||||
|
||||
// For each shard, compute write targets and group by node
|
||||
let mut node_batches: HashMap<String, Vec<Value>> = HashMap::new();
|
||||
|
||||
for (shard_id, docs) in shard_groups {
|
||||
let targets = router::write_targets(shard_id, &topology);
|
||||
|
||||
for target in targets {
|
||||
let node = topology
|
||||
.node(&target)
|
||||
.ok_or_else(|| MiroirError::Routing(format!("node {} not found", target.as_str())))?;
|
||||
|
||||
node_batches
|
||||
.entry(node.id.as_str().to_string())
|
||||
.or_insert_with(Vec::new)
|
||||
.extend(docs.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Fan out writes to all nodes
|
||||
let miroir_task_id = format!("mtask-{}", Uuid::new_v4());
|
||||
|
||||
let mut node_tasks: HashMap<String, u64> = HashMap::new();
|
||||
let mut group_quorum: HashMap<u32, GroupQuorum> = HashMap::new();
|
||||
let mut failed_nodes = Vec::new();
|
||||
|
||||
for (node_id, docs) in node_batches {
|
||||
let body = serde_json::to_vec(&docs).unwrap();
|
||||
let path = format!("/indexes/{}/documents", index);
|
||||
|
||||
let request = ScatterRequest {
|
||||
body,
|
||||
headers: vec![],
|
||||
method: "POST".to_string(),
|
||||
path,
|
||||
};
|
||||
|
||||
// Send to this node
|
||||
let result = self
|
||||
.scatter
|
||||
.scatter(&topology, vec![node_id.clone().into()], request, UnavailableShardPolicy::Partial)
|
||||
.await?;
|
||||
|
||||
if let Some(resp) = result.responses.first() {
|
||||
// Parse response to get task UID
|
||||
if let Some(task_uid) = resp.body.get("taskUid").and_then(|v| v.as_u64()) {
|
||||
node_tasks.insert(node_id.clone(), task_uid);
|
||||
|
||||
// Track per-group quorum
|
||||
if let Some(node) = topology.node(&node_id.clone().into()) {
|
||||
let group_id = node.replica_group;
|
||||
let quorum = group_quorum.entry(group_id).or_insert_with(|| {
|
||||
GroupQuorum {
|
||||
group_id,
|
||||
rf,
|
||||
acked: HashSet::new(),
|
||||
}
|
||||
});
|
||||
quorum.acked.insert(node_id.clone());
|
||||
}
|
||||
} else {
|
||||
failed_nodes.push(node_id);
|
||||
}
|
||||
} else {
|
||||
failed_nodes.push(node_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Check quorum - write succeeds if at least one group met quorum
|
||||
let degraded_groups = self.check_quorum(&group_quorum, &topology);
|
||||
let any_group_met_quorum = group_quorum.values().any(|q| q.met_quorum());
|
||||
|
||||
if !any_group_met_quorum {
|
||||
return Err(MiroirError::Routing("No replica group met quorum".to_string()));
|
||||
}
|
||||
|
||||
Ok(WriteResult {
|
||||
miroir_task_id,
|
||||
node_tasks,
|
||||
degraded_groups,
|
||||
})
|
||||
}
|
||||
|
||||
async fn resolve_primary_key(&self, index: &str, primary_key: Option<&str>) -> Result<String> {
|
||||
if let Some(pk) = primary_key {
|
||||
return Ok(pk.to_string());
|
||||
}
|
||||
|
||||
// Query index to get primary key
|
||||
let topology = self.state.topology().await;
|
||||
let first_node = topology.nodes().next();
|
||||
|
||||
if let Some(node) = first_node {
|
||||
let resp = self
|
||||
.state
|
||||
.client
|
||||
.send_to_node(&topology, &node.id, "GET", &format!("/indexes/{}", index), None, &[])
|
||||
.await?;
|
||||
|
||||
if let Some(pk) = resp.body.get("primaryKey").and_then(|v| v.as_str()) {
|
||||
return Ok(pk.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
Err(MiroirError::Routing(format!(
|
||||
"Index {} does not have a primary key",
|
||||
index
|
||||
)))
|
||||
}
|
||||
|
||||
fn extract_pk_value(&self, doc: &Value, pk: &str) -> Result<String> {
|
||||
let obj = doc
|
||||
.as_object()
|
||||
.ok_or_else(|| MiroirError::Routing("Document is not an object".to_string()))?;
|
||||
|
||||
let value = obj.get(pk).ok_or_else(|| {
|
||||
MiroirError::Routing(format!("Primary key '{}' not found in document", pk))
|
||||
})?;
|
||||
|
||||
Ok(value.to_string())
|
||||
}
|
||||
|
||||
fn check_quorum(&self, group_quorum: &HashMap<u32, GroupQuorum>, topology: &Topology) -> Vec<u32> {
|
||||
let mut degraded = Vec::new();
|
||||
|
||||
for (group_id, quorum) in group_quorum {
|
||||
if !quorum.met_quorum() {
|
||||
degraded.push(*group_id);
|
||||
}
|
||||
}
|
||||
|
||||
degraded
|
||||
}
|
||||
}
|
||||
|
||||
/// Result of a document write operation.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WriteResult {
|
||||
pub miroir_task_id: String,
|
||||
pub node_tasks: HashMap<String, u64>,
|
||||
pub degraded_groups: Vec<u32>,
|
||||
}
|
||||
|
||||
/// Quorum tracking for a replica group.
|
||||
#[derive(Debug)]
|
||||
struct GroupQuorum {
|
||||
group_id: u32,
|
||||
rf: usize,
|
||||
acked: HashSet<String>,
|
||||
}
|
||||
|
||||
impl GroupQuorum {
|
||||
fn met_quorum(&self) -> bool {
|
||||
let required = (self.rf / 2) + 1;
|
||||
self.acked.len() >= required
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use miroir_core::config::{MiroirConfig, NodeConfig, ServerConfig};
|
||||
use miroir_core::topology::{Node, NodeId, Topology};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_extract_pk_value() {
|
||||
let doc = json!({"id": "test123", "name": "foo"});
|
||||
let executor = create_test_executor();
|
||||
|
||||
let result = executor.extract_pk_value(&doc, "id").unwrap();
|
||||
assert_eq!(result, "\"test123\"");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_extract_pk_value_missing() {
|
||||
let doc = json!({"name": "foo"});
|
||||
let executor = create_test_executor();
|
||||
|
||||
let result = executor.extract_pk_value(&doc, "id");
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_group_quorum_met() {
|
||||
let quorum = GroupQuorum {
|
||||
group_id: 0,
|
||||
rf: 3,
|
||||
acked: HashSet::from(["node1".to_string(), "node2".to_string()]),
|
||||
};
|
||||
|
||||
assert!(quorum.met_quorum()); // 2 >= (3/2)+1 = 2
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_group_quorum_not_met() {
|
||||
let quorum = GroupQuorum {
|
||||
group_id: 0,
|
||||
rf: 3,
|
||||
acked: HashSet::from(["node1".to_string()]),
|
||||
};
|
||||
|
||||
assert!(!quorum.met_quorum()); // 1 < 2
|
||||
}
|
||||
|
||||
fn create_test_executor() -> WriteExecutor {
|
||||
let config = MiroirConfig {
|
||||
shards: 64,
|
||||
replication_factor: 2,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let state = ProxyState::new(config).unwrap();
|
||||
WriteExecutor::new(state)
|
||||
}
|
||||
}
|
||||
193
notes/miroir-9dj-phase2-completion.md
Normal file
193
notes/miroir-9dj-phase2-completion.md
Normal file
|
|
@ -0,0 +1,193 @@
|
|||
# Phase 2 — Proxy + API Surface: Completion Verification
|
||||
|
||||
## Date
|
||||
2026-05-09
|
||||
|
||||
## Scope Verification
|
||||
|
||||
### 1. axum server listening on port 7700 and metrics on 9090
|
||||
**Status:** ✅ COMPLETE
|
||||
- File: `crates/miroir-proxy/src/main.rs`
|
||||
- Server configured to listen on `server.port` (default 7700)
|
||||
- Metrics server on port 9090 with `/metrics` endpoint
|
||||
|
||||
### 2. Write path implementation
|
||||
**Status:** ✅ COMPLETE
|
||||
- File: `crates/miroir-proxy/src/routes/documents.rs`
|
||||
- Hash primary key to get shard ID (`shard_for_key()`)
|
||||
- Inject `_miroir_shard` field into documents
|
||||
- Fan out to RG × RF nodes
|
||||
- Per-group quorum (`floor(RF/2)+1`)
|
||||
- `X-Miroir-Degraded` header on any group missing quorum
|
||||
- 503 `miroir_no_quorum` only when no group met quorum
|
||||
|
||||
### 3. Read path implementation
|
||||
**Status:** ✅ COMPLETE
|
||||
- File: `crates/miroir-proxy/src/routes/search.rs`
|
||||
- Pick group via `query_seq % RG`
|
||||
- Build intra-group covering set
|
||||
- Scatter search to covering set nodes
|
||||
- Merge by `_rankingScore`
|
||||
- Strip `_miroir_shard` always + `_rankingScore` if not requested
|
||||
- Aggregate facets + estimatedTotalHits
|
||||
- Report max processingTimeMs
|
||||
- Group fallback when covering set has holes
|
||||
|
||||
### 4. Index lifecycle
|
||||
**Status:** ✅ COMPLETE
|
||||
- File: `crates/miroir-proxy/src/routes/indexes.rs`
|
||||
- Create broadcasts + atomically injects `_miroir_shard` into filterableAttributes
|
||||
- Settings sequential apply-with-rollback (in `routes/settings.rs`)
|
||||
- Delete broadcasts to all nodes
|
||||
- Stats aggregate `numberOfDocuments` + merge `fieldDistribution`
|
||||
|
||||
### 5. Tasks
|
||||
**Status:** ✅ COMPLETE
|
||||
- File: `crates/miroir-proxy/src/routes/tasks.rs`
|
||||
- `GET /tasks` - List all tasks with optional filters
|
||||
- `GET /tasks/{uid}` - Get specific task with aggregation
|
||||
- `DELETE /tasks/{uid}` - Cancel/delete task
|
||||
- Task ID reconciliation across nodes
|
||||
- Aggregated status from all nodes
|
||||
|
||||
### 6. Error shape
|
||||
**Status:** ✅ COMPLETE
|
||||
- File: `crates/miroir-proxy/src/error_response.rs`
|
||||
- Every error matches Meilisearch `{message, code, type, link}`
|
||||
- Miroir-specific `miroir_*` codes:
|
||||
- `miroir_primary_key_required`
|
||||
- `miroir_no_quorum`
|
||||
- `miroir_shard_unavailable`
|
||||
- `miroir_reserved_field`
|
||||
|
||||
### 7. Reserved fields contract
|
||||
**Status:** ✅ COMPLETE
|
||||
- `_miroir_shard` always-reserved (injected on write, stripped on read)
|
||||
- `_miroir_updated_at` / `_miroir_expires_at` reserved only when feature flag is on (Phase 5)
|
||||
|
||||
### 8. Auth
|
||||
**Status:** ✅ COMPLETE
|
||||
- File: `crates/miroir-proxy/src/auth.rs`
|
||||
- Master-key/admin-key bearer dispatch per §5 rules 2-5
|
||||
- JWT path stubbed (Phase 5)
|
||||
- Public endpoints: `/health`, `/version`
|
||||
- Admin endpoints: `/admin/*`, `/_miroir/*`
|
||||
|
||||
### 9. Admin endpoints
|
||||
**Status:** ✅ COMPLETE
|
||||
- File: `crates/miroir-proxy/src/routes/admin.rs`
|
||||
- `/health` - Public health check
|
||||
- `/version` - Version information
|
||||
- `/_miroir/ready` - Readiness check
|
||||
- `/_miroir/topology` - Cluster topology information
|
||||
- `/_miroir/shards` - Shard assignment information
|
||||
- `/_miroir/metrics` - Prometheus metrics (admin-key gated)
|
||||
|
||||
### 10. Middleware
|
||||
**Status:** ✅ COMPLETE
|
||||
- File: `crates/miroir-proxy/src/middleware.rs`
|
||||
- Structured JSON log per plan §10
|
||||
- Prometheus metrics:
|
||||
- `miroir_request_duration_seconds`
|
||||
- `miroir_requests_total`
|
||||
- `miroir_requests_in_flight`
|
||||
- `miroir_degraded_requests_total`
|
||||
- `miroir_no_quorum_requests_total`
|
||||
|
||||
### 11. Scatter-gather dispatcher
|
||||
**Status:** ✅ COMPLETE
|
||||
- File: `crates/miroir-proxy/src/scatter.rs`
|
||||
- Per-node retries with orchestrator-side retry cache
|
||||
- Cache keyed by `sha256(batch || target_node || idempotency_or_mtask)`
|
||||
- File: `crates/miroir-proxy/src/retry_cache.rs`
|
||||
|
||||
## Integration Tests
|
||||
|
||||
**Status:** ✅ TESTS DEFINED
|
||||
- File: `crates/miroir-proxy/tests/phase2_integration_test.rs`
|
||||
- All DoD tests defined with `#[ignore]` for running against live cluster
|
||||
|
||||
### Test Coverage:
|
||||
1. ✅ 1000 documents indexed across 3 nodes, each retrievable by ID
|
||||
2. ✅ Unique-keyword search finds every doc exactly once
|
||||
3. ✅ Facet aggregation across 3 color values sums correctly
|
||||
4. ✅ Offset/limit paging preserves global ordering
|
||||
5. ✅ Write with one group completely down still succeeds and stamps X-Miroir-Degraded
|
||||
6. ✅ Error-format parity test
|
||||
7. ✅ GET /_miroir/topology matches expected shape
|
||||
8. ✅ Index stats aggregation
|
||||
|
||||
## Files Modified/Created
|
||||
|
||||
### Created:
|
||||
- `crates/miroir-proxy/src/main.rs` - Main entry point
|
||||
- `crates/miroir-proxy/src/lib.rs` - Library exports
|
||||
- `crates/miroir-proxy/src/auth.rs` - Authentication middleware
|
||||
- `crates/miroir-proxy/src/client.rs` - HTTP client for node communication
|
||||
- `crates/miroir-proxy/src/error_response.rs` - Meilisearch-compatible error responses
|
||||
- `crates/miroir-proxy/src/middleware.rs` - Logging and metrics middleware
|
||||
- `crates/miroir-proxy/src/retry_cache.rs` - Retry cache for idempotency
|
||||
- `crates/miroir-proxy/src/scatter.rs` - HTTP scatter-gather execution
|
||||
- `crates/miroir-proxy/src/state.rs` - Shared application state
|
||||
- `crates/miroir-proxy/src/task_manager.rs` - Task ID generation and reconciliation
|
||||
- `crates/miroir-proxy/src/routes/` - Route handlers directory
|
||||
- `mod.rs` - Route module exports
|
||||
- `admin.rs` - Admin endpoints
|
||||
- `documents.rs` - Document CRUD routes
|
||||
- `health.rs` - Health check endpoints
|
||||
- `indexes.rs` - Index lifecycle routes
|
||||
- `search.rs` - Search route
|
||||
- `settings.rs` - Settings routes with rollback
|
||||
- `tasks.rs` - Task routes
|
||||
- `crates/miroir-proxy/tests/phase2_integration_test.rs` - Integration tests
|
||||
|
||||
### Removed (cleanup):
|
||||
- `crates/miroir-proxy/src/search_handler.rs` - Superseded by routes/search.rs
|
||||
- `crates/miroir-proxy/src/index_handler.rs` - Superseded by routes/indexes.rs
|
||||
- `crates/miroir-proxy/src/write.rs` - Superseded by routes/documents.rs
|
||||
|
||||
## Verification Against Plan §5 API Surface
|
||||
|
||||
### Meilisearch API Compatibility:
|
||||
- ✅ POST /indexes - Create index
|
||||
- ✅ GET /indexes - List indexes
|
||||
- ✅ GET /indexes/{uid} - Get index metadata
|
||||
- ✅ DELETE /indexes/{uid} - Delete index
|
||||
- ✅ GET /indexes/{uid}/stats - Get index stats
|
||||
- ✅ GET /indexes/{uid}/settings - Get settings
|
||||
- ✅ PATCH /indexes/{uid}/settings - Update settings (with rollback)
|
||||
- ✅ POST /indexes/{uid}/documents - Add documents
|
||||
- ✅ PUT /indexes/{uid}/documents - Update documents
|
||||
- ✅ DELETE /indexes/{uid}/documents - Delete by filter
|
||||
- ✅ DELETE /indexes/{uid}/documents/{id} - Delete by ID
|
||||
- ✅ GET /indexes/{uid}/documents/{id} - Get document by ID
|
||||
- ✅ POST /indexes/{uid}/search - Search documents
|
||||
- ✅ GET /tasks - List tasks
|
||||
- ✅ GET /tasks/{uid} - Get task status
|
||||
- ✅ DELETE /tasks/{uid} - Cancel task
|
||||
|
||||
### Miroir-specific endpoints:
|
||||
- ✅ GET /health - Public health
|
||||
- ✅ GET /version - Version info
|
||||
- ✅ GET /_miroir/ready - Readiness check
|
||||
- ✅ GET /_miroir/topology - Cluster topology
|
||||
- ✅ GET /_miroir/shards - Shard assignments
|
||||
- ✅ GET /_miroir/metrics - Prometheus metrics
|
||||
|
||||
## Conclusion
|
||||
|
||||
Phase 2 implementation is **COMPLETE**. All components from the DoD checklist have been implemented:
|
||||
|
||||
1. ✅ HTTP server on ports 7700 (main) and 9090 (metrics)
|
||||
2. ✅ Write path with shard hashing, _miroir_shard injection, quorum, degraded header
|
||||
3. ✅ Read path with group selection, covering set, merge by _rankingScore
|
||||
4. ✅ Index lifecycle with _miroir_shard injection, settings rollback
|
||||
5. ✅ Tasks with reconciliation
|
||||
6. ✅ Error shape matching Meilisearch
|
||||
7. ✅ Reserved fields handling
|
||||
8. ✅ Auth (master-key/admin-key bearer dispatch)
|
||||
9. ✅ Admin endpoints
|
||||
10. ✅ Middleware (logging, metrics)
|
||||
11. ✅ Retry cache for idempotency
|
||||
|
||||
Integration tests are defined and ready to run against a live Meilisearch cluster.
|
||||
Loading…
Add table
Reference in a new issue