diff --git a/Cargo.toml b/Cargo.toml index 312f46d..058b161 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "crates/*", "crates/bpe/benchmarks", "crates/bpe/tests", + "crates/consistent-choose-k/benchmarks", ] resolver = "2" diff --git a/README.md b/README.md index 0dbe85f..1a61103 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ A collection of useful algorithms written in Rust. Currently contains: - [`geo_filters`](crates/geo_filters): probabilistic data structures that solve the [Distinct Count Problem](https://en.wikipedia.org/wiki/Count-distinct_problem) using geometric filters. - [`bpe`](crates/bpe): fast, correct, and novel algorithms for the [Byte Pair Encoding Algorithm](https://en.wikipedia.org/wiki/Large_language_model#BPE) which are particularly useful for chunking of documents. - [`bpe-openai`](crates/bpe-openai): Fast tokenizers for OpenAI token sets based on the `bpe` crate. +- [`consistent-choose-k`](crates/consistent-choose-k): constant time consistent hashing algorithms with support for replication and bounded load. - [`sparse-ngrams`](crates/sparse-ngrams): fast sparse n-gram extraction from byte slices. Selects variable-length n-grams (2–8 bytes) deterministically using bigram frequency priorities, suitable for substring search indexes. - [`string-offsets`](crates/string-offsets): converts string positions between bytes, chars, UTF-16 code units, and line numbers. Useful when sending string indices across language boundaries. diff --git a/crates/bpe-openai/Cargo.toml b/crates/bpe-openai/Cargo.toml index 9bc20e7..8c5a227 100644 --- a/crates/bpe-openai/Cargo.toml +++ b/crates/bpe-openai/Cargo.toml @@ -21,7 +21,7 @@ unicode-normalization = "0.1" [dev-dependencies] bpe = { version = "0.2", path = "../bpe", features = ["rand"] } -tiktoken-rs = "0.9" +tiktoken-rs = "0.11" [build-dependencies] base64 = "0.22" diff --git a/crates/bpe/Cargo.toml b/crates/bpe/Cargo.toml index beaef6c..ff1de4c 100644 --- a/crates/bpe/Cargo.toml +++ b/crates/bpe/Cargo.toml @@ -26,7 +26,7 @@ serde = { version = "1", features = ["derive"] } [dev-dependencies] bpe = { path = "." } -tiktoken-rs = "0.9" +tiktoken-rs = "0.11" [package.metadata.docs.rs] all-features = true diff --git a/crates/bpe/benchmarks/Cargo.toml b/crates/bpe/benchmarks/Cargo.toml index 6fb1a6a..368fb35 100644 --- a/crates/bpe/benchmarks/Cargo.toml +++ b/crates/bpe/benchmarks/Cargo.toml @@ -22,5 +22,5 @@ bpe = { path = "../../bpe", features = ["rand", "tiktoken"] } bpe-openai = { path = "../../bpe-openai" } criterion = "0.8" rand = "0.10" -tiktoken-rs = "0.9" -tokenizers = { version = "0.22", features = ["http"] } +tiktoken-rs = "0.11" +tokenizers = { version = "0.23", features = ["http"] } diff --git a/crates/bpe/tests/Cargo.toml b/crates/bpe/tests/Cargo.toml index 75ba2ea..640eb52 100644 --- a/crates/bpe/tests/Cargo.toml +++ b/crates/bpe/tests/Cargo.toml @@ -7,4 +7,4 @@ bpe = { path = "../../bpe", features = ["rand"] } bpe-openai = { path = "../../bpe-openai" } itertools = "0.14" rand = "0.10" -tiktoken-rs = "0.9" +tiktoken-rs = "0.11" diff --git a/crates/consistent-choose-k/Cargo.toml b/crates/consistent-choose-k/Cargo.toml new file mode 100644 index 0000000..d8341a8 --- /dev/null +++ b/crates/consistent-choose-k/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "consistent-choose-k" +version = "0.1.0" +edition = "2021" +description = "Stateless consistent choose-k hashing for replication, failover, and bounded-load placement." +repository = "https://github.com/github/rust-gems" +homepage = "https://github.com/github/rust-gems/tree/main/crates/consistent-choose-k" +documentation = "https://docs.rs/consistent-choose-k" +readme = "README.md" +license = "MIT" +keywords = ["consistent", "hashing", "replication", "choose-k", "sampling"] +categories = ["algorithms", "data-structures", "mathematics", "science"] + +[lib] +crate-type = ["lib", "staticlib"] +bench = false + +[dependencies] + +[dev-dependencies] diff --git a/crates/consistent-choose-k/README.md b/crates/consistent-choose-k/README.md new file mode 100644 index 0000000..4270d68 --- /dev/null +++ b/crates/consistent-choose-k/README.md @@ -0,0 +1,197 @@ +# Consistent Hashing + +Consistent hashing maps keys to a changing set of nodes (shards, servers) so that when nodes join or leave, only a small fraction of keys move. It is used in distributed caches, databases, object stores, and load balancers to achieve scalability and high availability with minimal data reshuffling. + +Common algorithms +- [Consistent hashing](https://en.wikipedia.org/wiki/Consistent_hashing) (hash ring with virtual nodes) +- [Rendezvous hashing](https://en.wikipedia.org/wiki/Rendezvous_hashing) +- [Jump consistent hash](https://arxiv.org/pdf/1406.2294) +- [Maglev hashing](https://research.google/pubs/pub44824) +- [AnchorHash: A Scalable Consistent Hash](https://arxiv.org/abs/1812.09674) +- [DXHash](https://arxiv.org/abs/2107.07930) +- [JumpBackHash](https://arxiv.org/abs/2403.18682) + +## Core idea + +Many consistent-hashing algorithms are best understood as specialized solutions +to one higher-level problem: primary placement, replication, bounded load, +failover, or arbitrary deletions. A single flat complexity table is often +misleading because those algorithms do not all expose the same operations. + +This crate instead centers on `ConsistentChooseK`: a stateless per-key ranking +of all nodes. The first item is the primary owner, the first `R` items are +replicas, the next item after a failed node is its failover target, and the same +ranking can drive bounded-load and deletion-tolerant assignment. The current +implementation extracts the first `R` distinct candidates in `O(R^2)` time +(`O(R log R)` with a heap optimization) and uses no persistent memory. + +Replication of keys +- Hash ring: replicate by walking clockwise to the next R distinct nodes. Virtual nodes help spread replicas more evenly. Replicas are not independently distributed. +- Rendezvous hashing: replicate by selecting the top R nodes by score for the key. This naturally yields R distinct owners and supports weights. +- Jump consistent hash: the base function doesn't support replication. While the math can be modified to support consistent replication, it cannot be efficiently solved for large k and even for small k (=2 or =3), a quadratic or cubic equation has to be solved. +- JumpBackHash and variants: The trick of Jump consistent hash to support replication won't work here due to the introduction of additional state. +- ConsistentChooseK: produces an ordered list of distinct, consistent candidates directly, making replication and related higher-level policies simple compositions over the same primitive. + +Why replication matters +- Tolerates node failures and maintenance without data unavailability. +- Distributes read/write load across multiple owners, reducing hotspots. +- Enables fast recovery and higher tail-latency resilience. + +## Applications beyond replication + +The `ConsistentChooseK` iterator produces a per-key ranking of all `n` nodes in priority order — consistently and with zero memory overhead. This ranking is a strict superset of simple replication and enables drop-in replacements for several well-known algorithms that traditionally require maintaining expensive data structures such as hash rings. + +### Bounded-load consistent hashing + +[Consistent Hashing with Bounded Loads](https://research.google/pubs/pub46580/) (Mirrokni et al., 2018) caps the maximum load any single node may receive. When a key's preferred node is full, it overflows to the next candidate. Classic implementations walk a hash ring to find successors, requiring O(V·N) memory for the ring where V is the number of virtual nodes per physical node (typically V > 100–200 for acceptable load variance). Lookups cost O(log(V·N)) via binary search. + +With `ConsistentChooseK`, the ranking iterator directly yields each key's preference list on the fly — no ring required. Assignment becomes: iterate tokens round by round, and for each token advance its ranking iterator until a node with remaining capacity is found. This achieves the same bounded-load guarantees with O(k) for k keys and O(k) time to extract the k-th key. + +See [`examples/bounded_load.rs`](examples/bounded_load.rs) for a working implementation. + +### Power of two choices + +The [power of two choices](https://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf) paradigm (Mitzenmacher, 2001; Azar et al., 1999) assigns each key to the least-loaded of two (or d) randomly chosen nodes. This reduces maximum load from O(log n / log log n) to O(log log n / log d) with high probability. + +Traditionally this requires drawing d independent random nodes per key. However, the original algorithm ignores the corner case where multiple independent hash functions collide on the same node, effectively reducing the number of distinct choices below d. With `ConsistentChooseK`, the first d elements from the ranking iterator are guaranteed to be distinct nodes. The choices are also consistent across time — the same key always considers the same d candidates — so reassignment only happens when a node actually joins or leaves. + +### Priority-based failover + +In active-passive or tiered architectures, each key needs a deterministic failover order. The ranking iterator provides exactly this: the first node is the primary, the second is the hot standby, and so on. When a node fails, the next node in the ranking takes over — consistently for all keys that had the failed node at the same rank position, and without any coordination or ring rebalancing. + +### Deletion-tolerant node maps + +`ConsistentNodeMap` uses the `ConsistentChooseK` ranking to support arbitrary node deletions with very small state. It stores only the total slot count and the set of deleted slots. Lookup generates the per-key choose-k ranking and returns the first slot that is not deleted. + +This solves the same deletion problem targeted by AnchorHash, MementoHash, and DxHash: when a node is removed, only keys assigned to that node move, and they are redistributed uniformly over the remaining nodes. The difference is that those algorithms keep replacement or redirect metadata that encodes enough of the removal history to repair hits on deleted nodes. `ConsistentNodeMap` is history-independent: it only needs the current deleted set. + +For many practical deployments, this also makes `ConsistentNodeMap` a compelling replacement for traditional hash-ring implementations with virtual nodes. Rings typically need hundreds of virtual nodes per physical node to obtain good balance, which makes their memory footprint orders of magnitude larger than the actual node set. Here the ranking is generated directly from the key, so deletion support only adds state proportional to the number of deleted slots rather than to a large virtual-node ring. + +The tradeoff is lookup work. If `h` deleted slots are encountered before the first live slot, the current iterator costs `O((h + 1)^2)` because producing the i-th choose-k candidate costs O(i). The expected number of deleted-node hits has the same harmonic/log behavior analyzed for history-based approaches, approximately `ln(total / active)` when `total` slots contain `active` live nodes. Thus the total expected lookup cost is `O((1 + ln(total / active))^2)`. + +| Algorithm | Total lookup time | Add node | Remove node | State | Predefined capacity? | History-dependent? | +|-----------|-------------------|----------|-------------|-------|----------------------|--------------------| +| Hash ring with `V` virtual nodes | `O(log(V·active))` | `O(V log(V·active))` | `O(V log(V·active))` | `O(V·active)` ring entries | No | No | +| `ConsistentNodeMap` | `O((h + 1)^2)`, expected `O((1 + ln(total / active))^2)` | `O(1)` expected | `O(1)` expected | `O(deleted)` deleted-slot set | No | No | +| AnchorHash | `O((h + 1)^2)`, expected `O((1 + ln(total / active))^2)` | `O(1)` expected | `O(1)` expected | `O(capacity)` anchor/removal state | Yes | Yes | +| MementoHash | `O((h + 1)^2)`, expected `O((1 + ln(total / active))^2)` | `O(1)` expected | `O(1)` expected | `O(deleted)` replacement tuples | No | Yes | +| DxHash | `O((h + 1)^2)`, expected `O((1 + ln(total / active))^2)` | `O(1)` expected | `O(1)` expected | `O(capacity)` redirect/displacement state with smaller constants than AnchorHash | Yes | Yes | + +## ConsistentChooseK algorithm + +The following functions summarize the core algorithmic innovation as a minimal Rust excerpt. +`n` is the number of nodes and `k` is the number of desired replica. +The chosen nodes are returned as distinct integers in the range `0..n`. + +``` +fn consistent_choose_k(key: Key, k: usize, n: usize) -> Vec { + (0..k).rev().scan(n, |n, k| Some(consistent_choose_max(key, k + 1, n))).collect() +} + +fn consistent_choose_max(key: Key, k: usize, n: usize) -> usize { + (0..k).map(|k| consistent_hash(key, k, n - k) + k).max() +} + +fn consistent_hash(key: Key, i: usize, n: usize) -> usize { + // compute the i-th independent consistent hash for `key` and `n` nodes. +} +``` + +`consistent_choose_k` makes `k` calls to `consistent_choose_max` which calls `consistent_hash` another `k` times. +In total, `consistent_hash` is called `k * (k+1) / 2` many times. Utilizing a `O(1)` solution for `consistent_hash` leads to a `O(k^2)` runtime. +This runtime can be further improved by replacing the max operation with a heap where popped elements are updated according to the new arguments `n` and `k`. +With this optimization, the complexity reduces to `O(k log k)`. +With some probabilistic bucketing strategy, it should be possible to reduce the expected runtime to `O(k)`. +For small `k` neither optimization is probably improving the actual performance though. + +The next section proves the correctness of this algorithm. + +## Relation to reservoir sampling + +`consistent_choose_k` solves the same distributional problem as +**reservoir sampling without replacement** — drawing a uniform `k`-subset from +`{0, …, n−1}` such that incrementing `n` evicts at most one element with +probability `k/(n+1)`. This is exactly the invariant maintained by Vitter's +classical streaming algorithms (Algorithm R, 1985; Algorithm L, Li 1994). + +The two approaches differ in *where the randomness lives* and *what queries +are cheap*: + +| | Algorithm L (streaming) | `consistent_choose_k` | +|--------------------------------|-----------------------------------|------------------------------------| +| Randomness source | fresh PRNG draws | deterministic `consistent_hash(key, …)` | +| State | `O(k)` reservoir + threshold `W` | none | +| Build sample for `n` | `O(k · (1 + log(n/k)))` (replay) | `O(k²)` or `O(k log k)`, no replay | +| Advance to next `n` | `O(1)` amortized (geometric skip) | `O(k)` via a `grow_n` step | + +In other words, `consistent_choose_k` is a **history-independent** analogue +of Algorithm L: + +- An `O(k)` `grow_n(key, k, n) → (Option, new_member?)` step would + mirror Algorithm L's geometric skip — advancing the active set from `n` to + `n+1` (or directly to the next `n` that actually changes the sample). The + ingredients are already there: the recursion `S(k, n+1)` differs from + `S(k, n)` in at most one element (Property 3), and that element is determined + by which level of the `consistent_choose_max` recursion the new `n` enters. +- Unlike Algorithm L, the active set for **any** `n` can also be recomputed + from scratch in `O(k²)` (or `O(k log k)`) without walking the prefix `1..n` — because the + randomness for level `i` is materialized by `consistent_hash(key, i, n−i)` + rather than threaded through a running PRNG state. This is what makes + `consistent_choose_k` usable as a *consistent hashing* primitive: any node + can compute the assignment for the current cluster size in isolation. + +So the same construction simultaneously gives a stateless consistent-hashing +ranking and a fully reproducible, addressable reservoir sample. Conversely, +reservoir sampling provides another way to view consistent hashing with +replication: it is the unique `k`-out-of-`n` growth process where each new node +joins the active set with probability `k/(n+1)`, evicts at most one old node, +and preserves a uniform active set after every growth step. + +## N-Choose-K replication + +We define the consistent `n-choose-k` replication as follows: + +1. For a given number `n` of nodes, choose `k` distinct nodes `S`. +2. For a given `key` the chosen set of nodes must be uniformly chosen from all possible sets of size `k`. +3. When `n` increases by one, at most one node in the chosen set will be changed. +4. and the node will be changed with probability `k/(n+1)`. + +In the remainder of this section we prove that the `consistent_choose_k` algorithm satisfies those properties. + +Let's define `M(k,n) = consistent_choose_max(_, k, n)` and `S(k, n) := consistent_choose_k(_, k, n)` as short-cuts for some arbitrary fixed `key`. +We assume that `consistent_hash(key, k, n)` computes `k` independent consistent hash functions. + +### Property 1 + +Since `M(k, n) < n` and `S(k, n) = {M(k, n)} ∪ S(k - 1, M(k, n))` for `k > 1`, `S(k, n)` constructs a strictly monotonically decreasing sequence. The sequence outputs exactly `k` elements which therefore must all be distinct which proves property 1 for `k <= n`. + +Properties 2, 3, and 4 can be proven via induction as follows. + +### Property 4 + +`k = 1`: We expect that `consistent_hash` returns a single uniformly distributed node index which is consistent in `n`, i.e. changes the hash value with probability `1/(n+1)`, when `n` increments by one. In our implementation, we use an `O(1)` implementation of the jump-hash algorithm. For `k=1`, `consistent_choose_k(key, 1, n)` becomes a single function call to `consistent_choose_max(key, 1, n)` which in turn calls `consistent_hash(key, 0, n)`. I.e. `consistent_choose_k` inherits all the desired properties from `consistent_hash` for `k=1` and all `n>=1`. + +`k → k+1`: `M(k+1, n+1) = M(k+1, n)` iff `M(k, n+1) < n` and `consistent_hash(_, k, n+1-k) < n - k`. The probability for the former is `(n+1-k)/(n+1)` by induction and `(n-k)/(n+1-k)` for the latter by the assumption that `consistent_hash` is a proper consistent hash function. Since both these probabilities are assumed to be independent, the probability that our initial value changes is `1 - (n+1-k)/(n+1) * (n-k)/(n+1-k) = 1 - (n-k)/(n+1) = (k+1)/(n+1)` proving property 4. + +### Property 3 + +Property 3 is trivially satisfied if `S(k+1, n+1) = S(k+1, n)`. So, we focus on the case where `S(k+1, n+1) != S(k+1, n)`, which implies that `n ∈ S(k+1, n+1)` as largest element. +We know that `S(k+1, n) = {m} ∪ S(k, m)` for some `m` by definition and `S(k, n) = S(k, u) ∖ {v} ∪ {w}` by induction for some `u`, `v`, and `w`. Thus far we have `S(k+1, n+1) = {n} ∪ S(k, n) = {n} ∪ S(k, u) ∖ {v} ∪ {w}`. + +If `u = m`, then `S(k+1, n) = {m} ∪ S(k, m) ∖ {v} ∪ {w}` and `S(k+1, n+1) = {n} ∪ S(k, n) = {n} ∪ S(k, m) ∖ {v} ∪ {w}` and the two differ exactly in the elements `m` and `n` proving property 3. + +If `u ≠ m`, then `consistent_hash(_, k, n) = m`, since that's the only way how the largest values in `S(k+1, n)` and `S(k, n)` can differ. In this case, `m ∉ S(k+1, n+1)`, since `n` (and not `m`) is the largest element of `S(k+1, n+1)`. Furthermore, `S(k, n) = S(k, m)`, since `consistent_hash(_, i, n) < m` for all `i < k` (otherwise there is a contradiction). +Putting it together leads to `S(k+1, n+1) = {n} ∪ S(k, m)` and `S(k+1, n) = {m} ∪ S(k, m)` which differ exactly in the elements `n` and `m` which concludes the proof. + +### Property 2 + +The final part is to prove property 2. This time we have an inducation over `k` and `n`. +As before, the base case of the induction for `k=1` and all `n>0` is inherited from the `consistency_hash` implementation. The case `n=k` is also trivially covered, since the only valid set are the numbers `{0, ..., k-1}` which the algorithm correctly outputs. So, we only need to care about the induction step where `k>1` and `n>k`. + +We need to prove that `P(i ∈ S(k+1, n+1)) = (k+1)/(n+1)` for all `0 <= i <= n`. Property 3 already proves the case `i = n`. Furthermore we know that `P(n ∈ S(k+1, n+1)) = (k+1)/(n+1)` and vice versa `P(n ∉ S(k+1, n+1)) = 1 - (k+1)/(n+1)`. Let's consider those two cases separately. + +`n ∈ S(k+1, n+1)`: By the definition of `S`, we know that `S(k+1, n+1) = {n} ∪ S(k, n)`. `P(i ∈ S(k+1, n+1)) = P(i ∈ S(k, n)) P(n ∈ S(k+1, n+1)) = k/n * (k+1)/(n+1)` for all `0 <= i < n`. + +`n ∉ S(k+1, n+1)`: Once more by definition, `S(k+1, n+1) = S(k+1, n)` in this case. `P(i ∈ S(k+1, n+1)) = P(i ∈ S(k+1, n)) P(n ∉ S(k+1, n+1)) = (k+1)/n * (1 - (k+1)/(n+1))` for all `0 <= i < n`. + +Summing both cases together leads to `P(i ∈ S(k+1, n+1)) = k/n * (k+1)/(n+1) + (k+1)/n * (1 - (k+1)/(n+1)) = k/n * (k+1)/(n+1) + k/n * (1 - (k+1)/(n+1)) + 1/n * (1 - (k+1)/(n+1)) = k/n * (k+1)/(n+1) + k/n - k/n * (k+1)/(n+1) + 1/n - 1/n * (k+1)/(n+1) = k/n + 1/n - 1/n * (k+1)/(n+1) = (k+1)/n - (k+1)/(n+1)/n = (k+1)/n * (1 - 1/(n+1)) = (k+1)/(n+1)` for all `0 <= i < n` which concludes the proof. diff --git a/crates/consistent-choose-k/benchmarks/Cargo.toml b/crates/consistent-choose-k/benchmarks/Cargo.toml new file mode 100644 index 0000000..7b17102 --- /dev/null +++ b/crates/consistent-choose-k/benchmarks/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "consistent-choose-k-benchmarks" +edition = "2021" + +[[bench]] +name = "performance" +path = "performance.rs" +harness = false +test = false + +[dependencies] +consistent-choose-k = { path = "../" } + +criterion = { version = "0.8", features = ["csv_output"] } +rand = "0.10" diff --git a/crates/consistent-choose-k/benchmarks/criterion.toml b/crates/consistent-choose-k/benchmarks/criterion.toml new file mode 100644 index 0000000..0e43927 --- /dev/null +++ b/crates/consistent-choose-k/benchmarks/criterion.toml @@ -0,0 +1,18 @@ +# save report in this directory, even if a custom target directory is set +criterion_home = "./target/criterion" + +# The colors table allows users to configure the colors used by the charts +# cargo-criterion generates. +[colors] +# Color-blind friendly color scheme from https://personal.sron.nl/~pault/. +comparison_colors = [ + {r = 51, g = 34, b = 136 }, # indigo + {r = 136, g = 204, b = 238 }, # cyan + {r = 68, g = 170, b = 153 }, # teal + {r = 17, g = 119, b = 51 }, # green + {r = 153, g = 153, b = 51 }, # olive + {r = 221, g = 204, b = 119 }, # sand + {r = 204, g = 102, b = 119 }, # rose + {r = 136, g = 34, b = 85 }, # wine + {r = 170, g = 68, b = 153 }, # purple +] diff --git a/crates/consistent-choose-k/benchmarks/performance.rs b/crates/consistent-choose-k/benchmarks/performance.rs new file mode 100644 index 0000000..bf4cce1 --- /dev/null +++ b/crates/consistent-choose-k/benchmarks/performance.rs @@ -0,0 +1,80 @@ +use std::{ + hash::{DefaultHasher, Hash}, + hint::black_box, + time::Duration, +}; + +use consistent_choose_k::{ConsistentChooseKHasher, ConsistentHasher}; +use criterion::{ + criterion_group, criterion_main, AxisScale, BenchmarkId, Criterion, PlotConfiguration, + Throughput, +}; +use rand::{rng, RngExt}; + +fn throughput_benchmark(c: &mut Criterion) { + let keys: Vec = rng().random_iter().take(1000).collect(); + + let mut group = c.benchmark_group("choose"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + for n in [1usize, 10, 100, 1000, 10000] { + group.throughput(Throughput::Elements(keys.len() as u64)); + group.bench_with_input(BenchmarkId::new("1", n), &n, |b, n| { + b.iter(|| { + for key in &keys { + let mut h = DefaultHasher::default(); + key.hash(&mut h); + black_box(ConsistentHasher::new(h).prev(*n + 1)); + } + }) + }); + for k in [1, 2, 3, 10, 100] { + group.throughput(Throughput::Elements((keys.len() * k) as u64)); + group.bench_with_input(BenchmarkId::new(format!("k_{k}"), n), &n, |b, n| { + b.iter(|| { + for key in &keys { + let mut h = DefaultHasher::default(); + key.hash(&mut h); + black_box(ConsistentChooseKHasher::new_with_k(h, *n + k, k)); + } + }) + }); + } + } + group.finish(); +} + +fn append_vs_new_with_k(c: &mut Criterion) { + let mut group = c.benchmark_group("append_vs_new_with_k"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + for n in [10usize, 100, 1000, 10000] { + for k in [2, 3, 10, 100] { + group.bench_function(BenchmarkId::new(format!("new_with_k/k_{k}"), n), |b| { + b.iter(|| { + let h = DefaultHasher::default(); + black_box(ConsistentChooseKHasher::new_with_k(h, n + k, k)); + }) + }); + group.bench_function(BenchmarkId::new(format!("append/k_{k}"), n), |b| { + b.iter(|| { + let h = DefaultHasher::default(); + let mut iter = ConsistentChooseKHasher::new(h, n + k); + for _ in 0..k { + black_box(iter.grow_k()); + } + }) + }); + } + } + group.finish(); +} + +criterion_group!( + name = benches; + config = Criterion::default() + .warm_up_time(Duration::from_millis(500)) + .measurement_time(Duration::from_millis(4000)) + .nresamples(1000); + + targets = throughput_benchmark, append_vs_new_with_k, +); +criterion_main!(benches); diff --git a/crates/consistent-choose-k/examples/bounded_load.rs b/crates/consistent-choose-k/examples/bounded_load.rs new file mode 100644 index 0000000..8ed6645 --- /dev/null +++ b/crates/consistent-choose-k/examples/bounded_load.rs @@ -0,0 +1,296 @@ +//! Bounded-load consistent hashing example. +//! +//! Compares unbounded vs bounded-load assignment across many random seeds, +//! reporting average and standard deviation of load spread and consistency +//! (fraction of assignments that change when a node is added). +//! +//! Bounded assignment iterates over tokens sequentially, greedily assigning +//! each token its k most-preferred nodes that still have capacity. Using +//! round-robin (all tokens claim one replica per round) yields nearly +//! identical churn numbers with marginally better load spread. +//! +//! Run with: cargo run --example bounded_load + +use std::collections::HashSet; +use std::hash::{DefaultHasher, Hash, Hasher}; +use std::time::Instant; + +use consistent_choose_k::ConsistentChooseKHasher; + +/// Bounded-load assignment. +/// +/// Each token claims all k replicas before moving to the next token, +/// skipping any node that has reached `max_load`. +fn bounded_load_assign>( + iters: impl IntoIterator, + k: usize, + n: usize, + max_load: usize, +) -> (Vec>, Vec) { + let mut load = vec![0usize; n]; + let mut assignments = Vec::new(); + + for mut iter in iters { + let mut assigned = Vec::with_capacity(k); + for node in iter.by_ref() { + if load[node] < max_load { + load[node] += 1; + assigned.push(node); + if assigned.len() == k { + break; + } + } + } + assignments.push(assigned); + } + (assignments, load) +} + +/// Count the number of assignments that changed between two runs. +fn count_churn(before: &[Vec], after: &[Vec]) -> usize { + before + .iter() + .zip(after.iter()) + .map(|(b, a)| b.iter().filter(|node| !a.contains(node)).count()) + .sum() +} + +/// Standard deviation of load across machines. +fn load_stddev(load: &[usize]) -> f64 { + let mean = load.iter().sum::() as f64 / load.len() as f64; + let var = load.iter().map(|&x| (x as f64 - mean).powi(2)).sum::() / load.len() as f64; + var.sqrt() +} + +/// A hash ring with `v` virtual nodes per physical node. +struct HashRing { + ring: Vec<(u64, usize)>, +} + +impl HashRing { + fn new(seed: u64, n: usize, v: usize) -> Self { + let mut ring: Vec<(u64, usize)> = (0..n) + .flat_map(|node| { + (0..v).map(move |vi| { + let mut h = DefaultHasher::default(); + seed.hash(&mut h); + node.hash(&mut h); + vi.hash(&mut h); + (h.finish(), node) + }) + }) + .collect(); + ring.sort_unstable_by_key(|&(pos, _)| pos); + Self { ring } + } + + /// Return an iterator over distinct physical nodes for the given token hash, + /// walking clockwise from the token's position on the ring. + fn iter(&self, token_hash: u64) -> HashRingIter<'_> { + let start = self.ring.partition_point(|&(pos, _)| pos < token_hash); + HashRingIter { + ring: &self.ring, + start, + offset: 0, + seen: HashSet::new(), + } + } +} + +/// Iterator that walks a hash ring clockwise, yielding distinct physical nodes. +struct HashRingIter<'a> { + ring: &'a [(u64, usize)], + start: usize, + offset: usize, + seen: HashSet, +} + +impl Iterator for HashRingIter<'_> { + type Item = usize; + + fn next(&mut self) -> Option { + while self.offset < self.ring.len() { + let (_, node) = self.ring[(self.start + self.offset) % self.ring.len()]; + self.offset += 1; + if self.seen.insert(node) { + return Some(node); + } + } + None + } +} + +fn hasher_for_seed_and_key(seed: u64, key: u64) -> DefaultHasher { + let mut h = DefaultHasher::default(); + seed.hash(&mut h); + let seed_state = h.finish(); + let mut h2 = DefaultHasher::default(); + seed_state.hash(&mut h2); + key.hash(&mut h2); + h2 +} + +struct Stats { + sum: f64, + sum_sq: f64, + count: f64, +} + +impl Stats { + fn new() -> Self { + Self { + sum: 0.0, + sum_sq: 0.0, + count: 0.0, + } + } + + fn push(&mut self, x: f64) { + self.sum += x; + self.sum_sq += x * x; + self.count += 1.0; + } + + fn mean(&self) -> f64 { + self.sum / self.count + } + + fn stddev(&self) -> f64 { + (self.sum_sq / self.count - self.mean().powi(2)) + .max(0.0) + .sqrt() + } +} + +const VIRTUAL_NODES: usize = 200; + +fn run(num_tokens: usize, k: usize, n: usize, num_seeds: u64) { + let total = num_tokens * k; + let cap = total.div_ceil(n) + 1; + + println!("Parameters: {num_tokens} tokens, k={k} replicas, {n} machines, {num_seeds} seeds"); + println!("Total assignments: {total}, capacity cap per machine: {cap}"); + println!( + "Perfect balance: {}×{} + {}×{}", + n - total % n, + total / n, + total % n, + total / n + 1 + ); + println!(); + + let mut ub_spread = Stats::new(); + let mut b_spread = Stats::new(); + let mut ring_spread = Stats::new(); + let mut ub_changes = Stats::new(); + let mut b_changes = Stats::new(); + let mut ring_changes = Stats::new(); + let mut ub_time_us = 0u128; + let mut b_time_us = 0u128; + let mut ring_time_us = 0u128; + + for seed in 0..num_seeds { + // ── Choose-k (unbounded) ───────────────────────────────────────── + let make_iters = |n| { + (0..num_tokens as u64) + .map(move |key| ConsistentChooseKHasher::new(hasher_for_seed_and_key(seed, key), n)) + }; + let t = Instant::now(); + let (unbounded, ub_load) = bounded_load_assign(make_iters(n), k, n, usize::MAX); + ub_time_us += t.elapsed().as_micros(); + ub_spread.push(load_stddev(&ub_load)); + + // ── Choose-k (bounded) ─────────────────────────────────────────── + let t = Instant::now(); + let (bounded, b_load) = bounded_load_assign(make_iters(n), k, n, cap); + b_time_us += t.elapsed().as_micros(); + b_spread.push(load_stddev(&b_load)); + + // ── Hash ring (bounded) ────────────────────────────────────────── + let ring = HashRing::new(seed, n, VIRTUAL_NODES); + let t = Instant::now(); + let (ring_assign, r_load) = bounded_load_assign( + (0..num_tokens as u64) + .map(|key| ring.iter(hasher_for_seed_and_key(seed, key).finish())), + k, + n, + cap, + ); + ring_time_us += t.elapsed().as_micros(); + ring_spread.push(load_stddev(&r_load)); + + // ── Consistency: add one machine ───────────────────────────────── + let n2 = n + 1; + let cap2 = total.div_ceil(n2) + 1; + + let (unbounded2, _) = bounded_load_assign(make_iters(n2), k, n2, usize::MAX); + ub_changes.push(count_churn(&unbounded, &unbounded2) as f64 / total as f64 * 100.0); + + let (bounded2, _) = bounded_load_assign(make_iters(n2), k, n2, cap2); + b_changes.push(count_churn(&bounded, &bounded2) as f64 / total as f64 * 100.0); + + let ring2 = HashRing::new(seed, n2, VIRTUAL_NODES); + let (ring_assign2, _) = bounded_load_assign( + (0..num_tokens as u64) + .map(|key| ring2.iter(hasher_for_seed_and_key(seed, key).finish())), + k, + n2, + cap2, + ); + ring_changes.push(count_churn(&ring_assign, &ring_assign2) as f64 / total as f64 * 100.0); + } + + println!( + "{:<24} {:>16} {:>16} {:>16}", + "", "Choose-k", "Bounded", "Ring Bounded" + ); + println!("{:-<24} {:->16} {:->16} {:->16}", "", "", "", ""); + println!( + "{:<24} {:>11.2} ± {:<5.2} {:>10.2} ± {:<5.2} {:>10.2} ± {:<5.2}", + "Load stddev", + ub_spread.mean(), + ub_spread.stddev(), + b_spread.mean(), + b_spread.stddev(), + ring_spread.mean(), + ring_spread.stddev(), + ); + println!( + "{:<24} {:>10.2}% ± {:<5.2} {:>9.2}% ± {:<5.2} {:>9.2}% ± {:<5.2}", + "Churn on n→n+1", + ub_changes.mean(), + ub_changes.stddev(), + b_changes.mean(), + b_changes.stddev(), + ring_changes.mean(), + ring_changes.stddev(), + ); + println!( + "{:<24} {:>13.1} ms {:>13.1} ms {:>13.1} ms", + "Total time", + ub_time_us as f64 / 1000.0, + b_time_us as f64 / 1000.0, + ring_time_us as f64 / 1000.0, + ); + println!("\n ideal churn: {:.2}%", 1.0 / (n + 1) as f64 * 100.0); +} + +fn main() { + // (num_tokens, k, n, num_seeds) + let configs: &[(usize, usize, usize, u64)] = &[ + (64, 3, 24, 1000), // original + (256, 3, 24, 1000), // more tokens, same k and n + (64, 1, 24, 1000), // k=1 (no replication) + (64, 5, 24, 1000), // higher replication + (64, 3, 8, 1000), // fewer machines + (64, 3, 60, 1000), // many machines (sparse) + (1_000_000, 3, 100_000, 1), // 1M tokens, 100k machines + ]; + + for (i, &(num_tokens, k, n, num_seeds)) in configs.iter().enumerate() { + if i > 0 { + println!("\n{}\n", "=".repeat(76)); + } + run(num_tokens, k, n, num_seeds); + } +} diff --git a/crates/consistent-choose-k/src/choose_k.rs b/crates/consistent-choose-k/src/choose_k.rs new file mode 100644 index 0000000..ce777ee --- /dev/null +++ b/crates/consistent-choose-k/src/choose_k.rs @@ -0,0 +1,297 @@ +use crate::{ConsistentHasher, ManySeqBuilder}; + +/// Implementation of a consistent choose k hashing algorithm. +/// It returns k distinct consistent hashes in the range `0..n`. +/// The hashes are consistent when `n` changes and when `k` changes! +/// I.e. one hash changes with probability `k/(n+1)` when `n` increases by one, +/// resp. one hash gets added when `k` is increased. Additionally, the returned `k` tuple +/// is guaranteed to be uniformely chosen from all possible `n-choose-k` tuples. +/// +/// Also implements `Iterator` to yield the next sample when k is increased. +/// Note: since this hashing algorithm implements choose k semantics, all the returned samples are distinct. +/// Note: The `Iterator` won't output the samples ordered by position. +/// +/// # Example +/// ``` +/// use std::hash::{DefaultHasher, Hash}; +/// use consistent_choose_k::ConsistentChooseKHasher; +/// +/// let mut h = DefaultHasher::default(); +/// 42u64.hash(&mut h); +/// let top3: Vec = ConsistentChooseKHasher::new(h, 100).take(3).collect(); +/// assert_eq!(top3.len(), 3); +/// ``` +pub struct ConsistentChooseKHasher { + builder: H, + n: usize, + samples: Vec, +} + +impl ConsistentChooseKHasher { + /// Create a new iterator for `n` nodes starting with k=0. + /// + /// Time: O(1) + pub fn new(builder: H, n: usize) -> Self { + Self { + builder, + n, + samples: Vec::new(), + } + } + + /// Create with the choose-k set for `k` out of `n` nodes pre-built. + /// + /// Average time: O(k^2) + pub fn new_with_k(builder: H, n: usize, k: usize) -> Self { + assert!(n >= k, "n must be at least k"); + let mut iter = Self::new(builder, n); + for i in 0..k { + iter.samples.push(iter.get_sample(i, n)); + } + for i in (0..k).rev() { + let s = iter.samples[0..=i].iter().copied().max().expect(""); + iter.samples[i] = s; + for j in 0..i { + if iter.samples[j] == s { + iter.samples[j] = iter.get_sample(j, s); + } + } + } + iter + } + + /// Returns the `k` underlying samples. + pub fn samples(&self) -> &[usize] { + &self.samples + } + + /// Converts self into the `k` underlying samples vector. + pub fn into_samples(self) -> Vec { + self.samples + } + + /// Returns the current universe size. + pub fn n(&self) -> usize { + self.n + } + + /// Returns the current sample size. + pub fn k(&self) -> usize { + self.samples.len() + } + + /// (Average) time: O(1) + fn get_sample(&self, k: usize, n: usize) -> usize { + ConsistentHasher::new(self.builder.seq_builder(k)) + .into_prev(n - k) + .expect("must not fail") + + k + } + + /// Decrements n to the largest sample and computes the new sample it is + /// being replaced with. Returns the index of the new largest sample. + /// + /// Time: O(k) + /// + /// Panics: if `n` is already at most `k`. + pub fn shrink_n(&mut self) -> usize { + assert!(self.n > self.k()); + let n = *self.samples.last().expect("samples must not be empty"); + self.n = n; + self.shrink_n_inner(n) + } + + fn shrink_n_inner(&mut self, mut n: usize) -> usize { + for i in (0..self.samples.len()).rev() { + if self.samples[i] < n { + // We are done! + return i + 1; + } + // The new maximum over all sequences at position i is either + // the sample of the sequence i or the maximum over all other sequences. + // The latter is already known via self.samples[i-1]. + let si = self.get_sample(i, n); + if i > 0 && self.samples[i - 1] > si { + self.samples[i] = self.samples[i - 1]; + } else { + self.samples[i] = si; + } + n = self.samples[i]; + } + 0 + } + + /// Grow the sample set by one element. Returns the index at which the new + /// element was inserted (i.e. its rank position). + /// + /// Time: O(k) + /// + /// Panics: if `k` equals `n`. + pub fn grow_k(&mut self) -> usize { + assert!(self.k() < self.n); + let k = self.samples.len(); + let sk = self.get_sample(k, self.n); + if let Some(last) = self.samples.last().copied() { + if last < sk { + self.samples.push(sk); + k + } else { + let i = self.shrink_n_inner(last); + self.samples.push(last); + i + } + } else { + self.samples.push(sk); + k + } + } +} + +impl Iterator for ConsistentChooseKHasher { + type Item = usize; + + fn next(&mut self) -> Option { + if self.samples.len() >= self.n { + return None; + } + let idx = self.grow_k(); + Some(self.samples[idx]) + } +} + +#[cfg(test)] +mod tests { + use std::hash::{DefaultHasher, Hash}; + + use super::*; + + fn hasher_for_key(key: u64) -> DefaultHasher { + let mut hasher = DefaultHasher::default(); + key.hash(&mut hasher); + hasher + } + + #[test] + fn test_ranking_matches_prev() { + // Every prefix of the ranking must equal the sorted prev(n) set. + for key in 0..200 { + for n in 2..30 { + let hasher = hasher_for_key(key); + let full: Vec = ConsistentChooseKHasher::new(hasher.clone(), n).collect(); + assert_eq!(full.len(), n); + for k in 1..=n { + let expected = + ConsistentChooseKHasher::new_with_k(hasher.clone(), n, k).into_samples(); + let mut prefix = full[..k].to_vec(); + prefix.sort(); + assert_eq!( + prefix, expected, + "key={key} n={n} k={k}: ranking prefix mismatch" + ); + } + } + } + } + + #[test] + fn test_ranking_k_equals_n() { + // When exhausted, the ranking contains all nodes 0..n. + for key in 0..200 { + for n in 1..20 { + let hasher = hasher_for_key(key); + let mut ranking: Vec = ConsistentChooseKHasher::new(hasher, n).collect(); + ranking.sort(); + let expected: Vec = (0..n).collect(); + assert_eq!(ranking, expected, "key={key} n={n}"); + } + } + } + + #[test] + fn test_uniform_k() { + const K: usize = 3; + for k in 0..100 { + let hasher = hasher_for_key(k); + for n in K..1000 { + let samples = + ConsistentChooseKHasher::new_with_k(hasher.clone(), n + 1, K).into_samples(); + assert!(samples.len() == K); + for i in 0..K - 1 { + assert!(samples[i] < samples[i + 1]); + } + let next = + ConsistentChooseKHasher::new_with_k(hasher.clone(), n + 2, K).into_samples(); + for i in 0..K { + assert!(samples[i] <= next[i]); + } + let mut merged = samples.clone(); + merged.extend(next.clone()); + merged.sort(); + merged.dedup(); + assert!( + merged.len() == K || merged.len() == K + 1, + "Unexpected {samples:?} vs. {next:?}" + ); + } + } + let mut stats = vec![0; 8]; + for i in 0..32 { + let hasher = hasher_for_key(i + 32783); + let samples = + ConsistentChooseKHasher::new_with_k(hasher, stats.len(), 2).into_samples(); + for s in samples { + stats[s] += 1; + } + } + println!("{stats:?}"); + assert_eq!(stats, vec![10, 12, 6, 6, 6, 5, 9, 10]); + // Test consistency when increasing k! + for k in 1..10 { + for n in k + 1..20 { + for key in 0..1000 { + let hasher = hasher_for_key(key); + let set1 = + ConsistentChooseKHasher::new_with_k(hasher.clone(), n, k).into_samples(); + let set2 = ConsistentChooseKHasher::new_with_k(hasher, n, k + 1).into_samples(); + assert_eq!(set1.len(), k); + assert_eq!(set2.len(), k + 1); + let mut merged = set1.clone(); + merged.extend(set2); + merged.sort(); + merged.dedup(); + assert_eq!(merged.len(), k + 1); + } + } + } + } + + #[test] + fn test_shrink_n() { + for k in 1..10 { + for n in k + 1..30 { + let mut iter = ConsistentChooseKHasher::new_with_k(DefaultHasher::new(), n, k); + while *iter.samples.last().unwrap() > k { + let expected = ConsistentChooseKHasher::new_with_k( + DefaultHasher::new(), + *iter.samples.last().unwrap(), + k, + ); + iter.shrink_n(); + assert_eq!(iter.samples, expected.samples); + } + } + } + } + + #[test] + fn test_grow_k() { + for n in 1..30 { + let mut iter = ConsistentChooseKHasher::new(DefaultHasher::new(), n); + for k in 1..10.min(n) { + let expected = ConsistentChooseKHasher::new_with_k(DefaultHasher::new(), n, k); + iter.grow_k(); + assert_eq!(iter.samples, expected.samples); + } + } + } +} diff --git a/crates/consistent-choose-k/src/consistent_hash.rs b/crates/consistent-choose-k/src/consistent_hash.rs new file mode 100644 index 0000000..0a44cde --- /dev/null +++ b/crates/consistent-choose-k/src/consistent_hash.rs @@ -0,0 +1,274 @@ +use std::hash::{Hash, Hasher}; + +/// A trait which behaves like a pseudo-random number generator. +/// It is used to generate consistent hashes within one bucket. +/// Note: the hasher must have been seeded with the key during construction. +pub trait HashSequence { + fn next(&mut self) -> u64; +} + +/// A trait for building a special bit mask and sequences of hashes for different bit positions. +/// Note: the hasher must have been seeded with the key during construction. +pub trait HashSeqBuilder { + type Seq: HashSequence; + + /// Returns a bit mask indicating which buckets have at least one hash. + fn bit_mask(&self) -> u64; + /// Return a HashSequence instance which is seeded with the given bit position + /// and the seed of this builder. + fn hash_seq(&self, bit: u64) -> Self::Seq; +} + +/// A trait for building multiple independent hash builders +/// Note: the hasher must have been seeded with the key during construction. +pub trait ManySeqBuilder { + type Builder: HashSeqBuilder; + + /// Returns the i-th independent hash builder. + fn seq_builder(&self, i: usize) -> Self::Builder; +} + +impl HashSequence for H { + fn next(&mut self) -> u64 { + 54387634019u64.hash(self); + self.finish() + } +} + +impl HashSeqBuilder for H { + type Seq = H; + + fn bit_mask(&self) -> u64 { + self.finish() + } + + fn hash_seq(&self, bit: u64) -> Self::Seq { + let mut hasher = self.clone(); + bit.hash(&mut hasher); + hasher + } +} + +impl ManySeqBuilder for H { + type Builder = H; + + fn seq_builder(&self, i: usize) -> Self::Builder { + let mut hasher = self.clone(); + i.hash(&mut hasher); + hasher + } +} + +/// One building block for the consistent hashing algorithm is a consistent +/// hash iterator which enumerates all the hashes for a specific bucket. +/// A bucket covers the range `(1< { + hasher: H, + n: usize, // Upper bound for the hash values within the bucket. + is_first: bool, + bit: u64, // A bitmask with a single bit set. +} + +impl BucketIterator { + fn new(n: usize, bit: u64, hasher: H) -> Self { + Self { + hasher, + n, + is_first: true, + bit, + } + } +} + +impl Iterator for BucketIterator { + type Item = usize; + + fn next(&mut self) -> Option { + if self.bit == 0 { + return None; + } + if self.is_first { + let res = (self.hasher.next() & (self.bit - 1)) + self.bit; + self.is_first = false; + if res < self.n as u64 { + self.n = res as usize; + return Some(self.n); + } + } + loop { + let res = self.hasher.next() & (self.bit * 2 - 1); + if res & self.bit == 0 { + return None; + } + if res < self.n as u64 { + self.n = res as usize; + return Some(self.n); + } + } + } +} + +/// An iterator which enumerates all the consistent hashes for a given key +/// from largest to smallest in the range `0..n`. +pub struct ConsistentHashRevIterator { + builder: H, + bits: u64, // Bitmask of unvisited buckets. + n: usize, // Exclusive upper bound for the hash values. + inner: Option>, // Iterator for the current bucket. +} + +impl ConsistentHashRevIterator { + pub fn new(n: usize, builder: H) -> Self { + Self { + bits: builder.bit_mask() & (n.next_power_of_two() as u64 - 1), + builder, + n, + inner: None, + } + } +} + +impl Iterator for ConsistentHashRevIterator { + type Item = usize; + + fn next(&mut self) -> Option { + if self.n == 0 { + return None; + } + if let Some(res) = self.inner.as_mut().and_then(|inner| inner.next()) { + return Some(res); + } + while self.bits > 0 { + let bit = 1 << self.bits.ilog2(); + self.bits ^= bit; + let seq = self.builder.hash_seq(bit); + let mut iter = BucketIterator::new(self.n, bit, seq); + if let Some(res) = iter.next() { + self.inner = Some(iter); + return Some(res); + } + } + self.n = 0; + Some(0) + } +} + +/// Same as `ConsistentHashRevIterator`, but iterates from smallest to largest +/// for the range `n..`. +pub struct ConsistentHashIterator { + bits: u64, // Bitmasks of unvisited buckets. + n: usize, // Inclusive lower bound for the hash values. + builder: H, + stack: Vec, // Stack of hashes in the current bucket. +} + +impl ConsistentHashIterator { + pub fn new(n: usize, builder: H) -> Self { + Self { + bits: builder.bit_mask() & !((n + 2).next_power_of_two() as u64 / 2 - 1), + stack: if n == 0 { vec![0] } else { vec![] }, + builder, + n, + } + } +} + +impl Iterator for ConsistentHashIterator { + type Item = usize; + + fn next(&mut self) -> Option { + if let Some(res) = self.stack.pop() { + return Some(res); + } + while self.bits > 0 { + let bit = self.bits & !(self.bits - 1); + self.bits &= self.bits - 1; + let inner = BucketIterator::new(bit as usize * 2, bit, self.builder.hash_seq(bit)); + self.stack = inner.take_while(|x| *x >= self.n).collect(); + if let Some(res) = self.stack.pop() { + return Some(res); + } + } + None + } +} + +/// Wrapper around `ConsistentHashIterator` and `ConsistentHashRevIterator` to compute +/// the next or previous consistent hash for a given key for a given number of nodes `n`. +pub struct ConsistentHasher { + builder: H, +} + +impl ConsistentHasher { + /// Construct a new ConsistentHasher with the given builder for a specific key. + pub fn new(builder: H) -> Self { + Self { builder } + } + + /// Return the largest consistent hash smaller than `n`. + pub fn prev(&self, n: usize) -> Option + where + H: Clone, + { + let mut sampler = ConsistentHashRevIterator::new(n, self.builder.clone()); + sampler.next() + } + + /// Return the smallest consistent hash greater than or equal to `n`. + pub fn next(&self, n: usize) -> Option + where + H: Clone, + { + let mut sampler = ConsistentHashIterator::new(n, self.builder.clone()); + sampler.next() + } + + /// Return the largest consistent hash smaller than `n`, consuming the hasher. + pub fn into_prev(self, n: usize) -> Option { + ConsistentHashRevIterator::new(n, self.builder).next() + } +} + +#[cfg(test)] +mod tests { + use std::hash::DefaultHasher; + + use super::*; + + fn hasher_for_key(key: u64) -> DefaultHasher { + let mut hasher = DefaultHasher::default(); + key.hash(&mut hasher); + hasher + } + + #[test] + fn test_uniform_1() { + for k in 0..100 { + let hasher = hasher_for_key(k); + let sampler = ConsistentHasher::new(hasher.clone()); + for n in 0..1000 { + assert!(sampler.prev(n + 1) <= sampler.prev(n + 2)); + let next = sampler.next(n).unwrap(); + assert_eq!(next, sampler.prev(next + 1).unwrap()); + } + let mut iter_rev: Vec<_> = ConsistentHashIterator::new(0, hasher.clone()) + .take_while(|x| *x < 1000) + .collect(); + iter_rev.reverse(); + let iter: Vec<_> = ConsistentHashRevIterator::new(1000, hasher).collect(); + assert_eq!(iter, iter_rev); + } + let mut stats = vec![0; 13]; + for i in 0..100000 { + let hasher = hasher_for_key(i); + let sampler = ConsistentHasher::new(hasher); + let x = sampler.prev(stats.len()).unwrap(); + stats[x] += 1; + } + assert_eq!( + stats, + vec![7577, 7541, 7538, 7822, 7763, 7687, 7718, 7723, 7846, 7723, 7688, 7716, 7658] + ); + } +} diff --git a/crates/consistent-choose-k/src/lib.rs b/crates/consistent-choose-k/src/lib.rs new file mode 100644 index 0000000..8a2c392 --- /dev/null +++ b/crates/consistent-choose-k/src/lib.rs @@ -0,0 +1,9 @@ +mod choose_k; +mod consistent_hash; +mod node_map; +pub use choose_k::ConsistentChooseKHasher; +pub use consistent_hash::{ + ConsistentHashIterator, ConsistentHashRevIterator, ConsistentHasher, HashSeqBuilder, + HashSequence, ManySeqBuilder, +}; +pub use node_map::ConsistentNodeMap; diff --git a/crates/consistent-choose-k/src/node_map.rs b/crates/consistent-choose-k/src/node_map.rs new file mode 100644 index 0000000..9bcb491 --- /dev/null +++ b/crates/consistent-choose-k/src/node_map.rs @@ -0,0 +1,350 @@ +use std::collections::HashSet; + +use crate::{ConsistentChooseKHasher, ManySeqBuilder}; + +/// A consistent node map that supports dynamic addition and deletion of nodes. +/// +/// Slots are tracked by storing the total number of slots and a set of deleted +/// slots. To find the slot associated with a key, the consistent choose-k +/// iterator yields positions in a consistent order; the first non-deleted slot +/// is returned. +/// +/// # Comparison with AnchorHash, MementoHash, and DxHash +/// +/// This solves the same problem as [AnchorHash], [MementoHash], and [DxHash]: +/// consistently mapping keys to a dynamic set of nodes where nodes can be +/// added and removed, with minimal key reassignment. All of these algorithms +/// guarantee that when a node is removed, only keys assigned to that node are +/// redistributed — and they are redistributed uniformly among the remaining +/// nodes. +/// +/// The key difference is history. AnchorHash, MementoHash, and DxHash keep +/// redirect/replacement state so that when lookup hits a deleted node, it can +/// replay enough of the prior removal process to find the correct replacement. +/// MementoHash, for example, defines its state as ``, where `R` is a +/// set of replacement tuples and `l` is the last removed bucket. This +/// implementation is history-independent: it only needs to know which slots are +/// currently deleted. Lookup simply iterates the consistent choose-k sequence +/// until it hits an active slot. +/// +/// This implementation takes a much simpler approach: it leverages the +/// consistent choose-k algorithm, which already provides both n-consistency +/// and k-consistency by construction. No auxiliary redirect structures are +/// needed beyond the current set of deleted slots. +/// +/// Let `total` be the number of slots, `active` the number of active slots, and +/// `h` the number of deleted slots hit during a lookup before the first active +/// slot is found. For AnchorHash and DxHash, `total` is the predefined capacity; +/// for MementoHash and this implementation, it is the current slot count. +/// MementoHash bounds the expected number of deleted-node hits by harmonic +/// sums, e.g. `1 + H_total - H_active`, which is at most +/// `1 + ln(total / active)`. +/// +/// In this implementation, the choose-k iterator never returns the same slot +/// twice, so a deleted slot can be hit at most once during a lookup. Thus the +/// scan has the same deleted-hit behavior as the history-based algorithms, but +/// without storing the deletion history. +/// +/// The current choose-k iterator costs O(k) to produce the k-th candidate, so a +/// lookup that skips `h` deleted slots costs O((h + 1)^2), and the corresponding +/// expected total lookup cost is O((1 + ln(total / active))^2). This is in the +/// same practical complexity regime as history-based redirection schemes: the +/// cost grows roughly quadratically with the number of deleted-node hits, while +/// the expected number of such hits stays small unless many slots are deleted. +/// +/// | Algorithm | Total lookup time | State | Predefined capacity? | History-dependent? | +/// | --- | --- | --- | --- | --- | +/// | `ConsistentNodeMap` | `O((h + 1)^2)`, expected `O((1 + ln(total / active))^2)` | `O(deleted)` deleted-slot set | No | No | +/// | AnchorHash | `O((h + 1)^2)`, expected `O((1 + ln(total / active))^2)` | `O(capacity)` anchor/removal state | Yes | Yes | +/// | MementoHash | `O((h + 1)^2)`, expected `O((1 + ln(total / active))^2)` | `O(deleted)` replacement tuples | No | Yes | +/// | DxHash | `O((h + 1)^2)`, expected `O((1 + ln(total / active))^2)` | `O(capacity)` redirect/displacement state with smaller constants than AnchorHash | Yes | Yes | +/// +/// The MementoHash paper explicitly notes that AnchorHash and DxHash keep an +/// internal data structure for all cluster nodes, both working and not working, +/// and require the overall capacity to be fixed during initialization. +/// MementoHash reduces memory by storing only replacement information for +/// removed buckets, but that replacement information still encodes the removal +/// history. This implementation has the same O(deleted) storage shape as that +/// idea, but stores only the deleted set. +/// +/// [AnchorHash]: https://arxiv.org/abs/1812.09674 +/// [MementoHash]: https://arxiv.org/abs/2306.09783 +/// [DxHash]: https://doi.org/10.1145/3631708 +/// +/// # Example +/// ``` +/// use std::hash::{DefaultHasher, Hash}; +/// use consistent_choose_k::ConsistentNodeMap; +/// +/// let mut map = ConsistentNodeMap::new(); +/// let a = map.add(); +/// let b = map.add(); +/// let c = map.add(); +/// +/// let mut h = DefaultHasher::default(); +/// 42u64.hash(&mut h); +/// let slot = map.get(h).unwrap(); +/// assert!(slot == a || slot == b || slot == c); +/// ``` +pub struct ConsistentNodeMap { + total: usize, + deleted: HashSet, +} + +impl Default for ConsistentNodeMap { + fn default() -> Self { + Self::new() + } +} + +impl ConsistentNodeMap { + /// Create an empty node map. + pub fn new() -> Self { + Self { + total: 0, + deleted: HashSet::new(), + } + } + + /// Add a slot and return its index. + /// + /// If there is a previously deleted slot, it will be reused. + pub fn add(&mut self) -> usize { + if let Some(i) = self.deleted.iter().next().copied() { + self.deleted.remove(&i); + i + } else { + let i = self.total; + self.total += 1; + i + } + } + + /// Remove the slot at the given index. Returns true if it was active. + pub fn remove(&mut self, index: usize) -> bool { + if index >= self.total || self.deleted.contains(&index) { + return false; + } + if index == self.total - 1 { + self.total -= 1; + } else { + self.deleted.insert(index); + } + true + } + + /// Returns the number of active slots. + pub fn len(&self) -> usize { + self.total - self.deleted.len() + } + + /// Returns true if there are no active slots. + pub fn is_empty(&self) -> bool { + self.total == self.deleted.len() + } + + /// Returns the total number of slots (including deleted ones). + pub fn slot_count(&self) -> usize { + self.total + } + + /// Returns whether the slot at the given index is active. + pub fn is_active(&self, index: usize) -> bool { + index < self.total && !self.deleted.contains(&index) + } + + /// Look up which slot a key maps to using consistent hashing. + /// + /// The `builder` should be a hasher seeded with the key. The consistent + /// choose-k iterator yields positions in a consistent order; the first + /// active slot is returned. + pub fn get(&self, builder: H) -> Option { + if self.is_empty() { + return None; + } + let mut iter = ConsistentChooseKHasher::new(builder, self.total); + iter.find(|pos| !self.deleted.contains(pos)) + } +} + +#[cfg(test)] +mod tests { + use std::hash::{DefaultHasher, Hash}; + + use super::*; + + fn hasher_for_key(key: u64) -> DefaultHasher { + let mut hasher = DefaultHasher::default(); + key.hash(&mut hasher); + hasher + } + + #[test] + fn test_add_remove() { + let mut map = ConsistentNodeMap::new(); + let a = map.add(); + let b = map.add(); + assert_eq!(map.len(), 2); + + assert!(map.remove(a)); + assert_eq!(map.len(), 1); + for key in 0..100 { + assert_eq!(map.get(hasher_for_key(key)), Some(b)); + } + + assert!(map.remove(b)); + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + assert!(map.get(hasher_for_key(0)).is_none()); + } + + #[test] + fn test_remove_returns_false_for_inactive() { + let mut map = ConsistentNodeMap::new(); + let a = map.add(); + assert!(map.remove(a)); + assert!(!map.remove(a)); + assert!(!map.remove(999)); + } + + #[test] + fn test_slot_reuse() { + let mut map = ConsistentNodeMap::new(); + map.add(); + let b = map.add(); + map.add(); + assert_eq!(map.slot_count(), 3); + + map.remove(b); + let d = map.add(); + assert_eq!(d, b); + assert_eq!(map.slot_count(), 3); + assert!(map.is_active(d)); + } + + #[test] + fn test_trailing_pop() { + let mut map = ConsistentNodeMap::new(); + let a = map.add(); // 0 + let b = map.add(); // 1 + let c = map.add(); // 2 + assert_eq!(map.slot_count(), 3); + + // Removing last slot pops it. + map.remove(c); + assert_eq!(map.slot_count(), 2); + + // Removing last again pops it. + map.remove(b); + assert_eq!(map.slot_count(), 1); + + // Middle removal is tracked as deleted, not popped. + let b2 = map.add(); // appends as 1 + let c2 = map.add(); // appends as 2 + assert_eq!(b2, 1); + assert_eq!(c2, 2); + map.remove(b2); // middle -> deleted set + assert_eq!(map.slot_count(), 3); + map.remove(c2); // trailing → only pops c2 + assert_eq!(map.slot_count(), 2); // b2 slot stays as inactive + assert_eq!(map.len(), 1); + assert!(map.is_active(a)); + } + + #[test] + fn test_consistency_after_add() { + let mut map = ConsistentNodeMap::new(); + for _ in 0..10 { + map.add(); + } + let before: Vec<_> = (0..10000) + .map(|k| map.get(hasher_for_key(k)).unwrap()) + .collect(); + map.add(); + let after: Vec<_> = (0..10000) + .map(|k| map.get(hasher_for_key(k)).unwrap()) + .collect(); + let changed = before.iter().zip(&after).filter(|(a, b)| a != b).count(); + assert!( + changed < 2000, + "too many keys changed after add: {changed}/10000" + ); + } + + #[test] + fn test_remove_10_percent_consistency() { + let n = 100; + let num_keys = 100_000u64; + let to_remove: Vec = (0..n).step_by(10).collect(); // 10% of nodes + + let mut map = ConsistentNodeMap::new(); + for _ in 0..n { + map.add(); + } + + let before: Vec = (0..num_keys) + .map(|k| map.get(hasher_for_key(k)).unwrap()) + .collect(); + + for &slot in &to_remove { + map.remove(slot); + } + let remaining = map.len(); + + let after: Vec = (0..num_keys) + .map(|k| map.get(hasher_for_key(k)).unwrap()) + .collect(); + + // 1. Keys not on removed nodes must stay on the same node. + let mut displaced = 0u64; + for (k, (b, a)) in before.iter().zip(&after).enumerate() { + if !to_remove.contains(b) { + assert_eq!( + b, a, + "key {k}: slot changed from {b} to {a} but was not on a removed slot" + ); + } else { + displaced += 1; + assert!( + !to_remove.contains(a), + "key {k}: reassigned to removed slot {a}" + ); + } + } + + // 2. Displaced fraction should be very close to the theoretical value. + let displaced_pct = displaced as f64 / num_keys as f64; + let theoretical_pct = to_remove.len() as f64 / n as f64; + assert!( + (displaced_pct - theoretical_pct).abs() < 0.01, + "displaced fraction {displaced_pct:.4} not close to theoretical {theoretical_pct:.4}" + ); + + // 3. After removal, distribution among remaining nodes should be + // roughly uniform: each node gets ~1/remaining of all keys. + let mut counts = vec![0u64; n]; + for &a in &after { + counts[a] += 1; + } + let expected = num_keys as f64 / remaining as f64; + let chi2: f64 = counts + .iter() + .enumerate() + .filter(|(i, _)| !to_remove.contains(i)) + .map(|(_, &c)| { + let diff = c as f64 - expected; + diff * diff / expected + }) + .sum(); + // Chi-squared critical value for 89 df at p=0.001 is ~122.9. + assert!( + chi2 < 200.0, + "distribution not uniform enough: chi2={chi2:.1} (expected < 200)" + ); + + // 4. Removed slots must have zero keys. + for &slot in &to_remove { + assert_eq!(counts[slot], 0, "removed slot {slot} still has keys"); + } + } +} diff --git a/crates/sparse-ngrams/Cargo.toml b/crates/sparse-ngrams/Cargo.toml index 80ca0f7..9b7d9ea 100644 --- a/crates/sparse-ngrams/Cargo.toml +++ b/crates/sparse-ngrams/Cargo.toml @@ -17,4 +17,4 @@ path = "benchmarks/performance.rs" harness = false [dev-dependencies] -criterion = "0.7" +criterion = "0.8" diff --git a/crates/sparse-ngrams/benchmarks/performance.rs b/crates/sparse-ngrams/benchmarks/performance.rs index ac2b2c3..123588a 100644 --- a/crates/sparse-ngrams/benchmarks/performance.rs +++ b/crates/sparse-ngrams/benchmarks/performance.rs @@ -1,4 +1,6 @@ -use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use std::hint::black_box; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use sparse_ngrams::{ collect_sparse_grams_deque, collect_sparse_grams_scan, max_sparse_grams, NGram, };