P0.7: cargo fmt to pass CI smoke
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
188fd5404c
commit
5b4a5cfd2d
5 changed files with 37 additions and 15 deletions
|
|
@ -7,7 +7,7 @@ use std::collections::HashMap;
|
|||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::migration::{MigrationConfig, MigrationCoordinator, MigrationError};
|
||||
use crate::migration::{MigrationConfig, MigrationError};
|
||||
|
||||
/// Anti-entropy configuration (plan §13.8).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
|
|
|||
|
|
@ -2,9 +2,12 @@
|
|||
//!
|
||||
//! Provides routing, merging, and topology logic for the Miroir distributed search proxy.
|
||||
|
||||
pub mod anti_entropy;
|
||||
pub mod config;
|
||||
pub mod error;
|
||||
pub mod merger;
|
||||
pub mod migration;
|
||||
pub mod reshard;
|
||||
pub mod router;
|
||||
pub mod scatter;
|
||||
pub mod task;
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
//! Rendezvous hash-based routing and shard assignment.
|
||||
|
||||
use crate::topology::{Group, NodeId, Topology};
|
||||
use twox_hash::XxHash64;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use twox_hash::XxHash64;
|
||||
|
||||
/// Compute a rendezvous score for a shard+node pair.
|
||||
///
|
||||
|
|
@ -23,14 +23,19 @@ pub fn assign_shard_in_group(shard_id: u32, group_nodes: &[NodeId], rf: usize) -
|
|||
.map(|n| (score(shard_id, n.as_str()), n))
|
||||
.collect();
|
||||
scored.sort_unstable_by(|a, b| b.0.cmp(&a.0));
|
||||
scored.into_iter().take(rf).map(|(_, n)| n.clone()).collect()
|
||||
scored
|
||||
.into_iter()
|
||||
.take(rf)
|
||||
.map(|(_, n)| n.clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// All write targets for a document: the RF nodes in EACH replica group.
|
||||
pub fn write_targets(shard_id: u32, topology: &Topology) -> Vec<NodeId> {
|
||||
topology.groups().flat_map(|group| {
|
||||
assign_shard_in_group(shard_id, group.nodes(), topology.rf())
|
||||
}).collect()
|
||||
topology
|
||||
.groups()
|
||||
.flat_map(|group| assign_shard_in_group(shard_id, group.nodes(), topology.rf()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Select the replica group for a query (round-robin by query counter).
|
||||
|
|
@ -40,11 +45,15 @@ pub fn query_group(query_seq: u64, replica_groups: u32) -> u32 {
|
|||
|
||||
/// The covering set for a search: one node per shard within the chosen group.
|
||||
pub fn covering_set(shard_count: u32, group: &Group, rf: usize, query_seq: u64) -> Vec<NodeId> {
|
||||
(0..shard_count).map(|shard_id| {
|
||||
let replicas = assign_shard_in_group(shard_id, group.nodes(), rf);
|
||||
// rotate through replicas for intra-group load balancing
|
||||
replicas[(query_seq as usize) % replicas.len()].clone()
|
||||
}).collect::<std::collections::HashSet<_>>().into_iter().collect()
|
||||
(0..shard_count)
|
||||
.map(|shard_id| {
|
||||
let replicas = assign_shard_in_group(shard_id, group.nodes(), rf);
|
||||
// rotate through replicas for intra-group load balancing
|
||||
replicas[(query_seq as usize) % replicas.len()].clone()
|
||||
})
|
||||
.collect::<std::collections::HashSet<_>>()
|
||||
.into_iter()
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Compute the shard ID for a document's primary key.
|
||||
|
|
|
|||
|
|
@ -17,7 +17,12 @@ pub trait TaskRegistry: Send + Sync {
|
|||
fn update_status(&self, miroir_id: &str, status: TaskStatus) -> Result<()>;
|
||||
|
||||
/// Update node task status.
|
||||
fn update_node_task(&self, miroir_id: &str, node_id: &str, node_status: NodeTaskStatus) -> Result<()>;
|
||||
fn update_node_task(
|
||||
&self,
|
||||
miroir_id: &str,
|
||||
node_id: &str,
|
||||
node_status: NodeTaskStatus,
|
||||
) -> Result<()>;
|
||||
|
||||
/// List tasks with optional filtering.
|
||||
fn list(&self, filter: TaskFilter) -> Result<Vec<MiroirTask>>;
|
||||
|
|
@ -126,7 +131,12 @@ impl TaskRegistry for StubTaskRegistry {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn update_node_task(&self, _miroir_id: &str, _node_id: &str, _node_status: NodeTaskStatus) -> Result<()> {
|
||||
fn update_node_task(
|
||||
&self,
|
||||
_miroir_id: &str,
|
||||
_node_id: &str,
|
||||
_node_status: NodeTaskStatus,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -143,12 +143,12 @@ impl Topology {
|
|||
/// Add a node to the topology.
|
||||
pub fn add_node(&mut self, node: Node) {
|
||||
let group_id = node.replica_group as usize;
|
||||
|
||||
|
||||
// Ensure group exists
|
||||
while self.groups.len() <= group_id {
|
||||
self.groups.push(Group::new(self.groups.len() as u32));
|
||||
}
|
||||
|
||||
|
||||
self.groups[group_id].add_node(node.id.clone());
|
||||
self.nodes.insert(node.id.clone(), node);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue