Consistent hashing helps in providing load balancing capability in distributed system. Let’s say i have multiple servers to accomodate high traffic, but we need a way to distribute the traffic to all servers equally. To achieve this lets discuss various approaches and what are its drawbacks
Approach 1: Hashing
A simple hashing technique would generate hash by key and does a simple modulo operation by k where k denotes number of servers in the system
In the above approach the system might suffer high collision leading to overload on target server. It also causes issue when we’re dealing with dynamic env, removal of any 1 server would require rehashing of keys and same goes for addition of server, as hash generated gets invalidated due to change in servers.
Approach 2: Consistent Hashing
Consistent hashing propose the idea to keep servers on the ‘Virtual Ring’ (Circular array) and generate hash of keys and get the index they fall into. Once we have the index we can easily get id of the server which is just next to this index in clockwise direction.
This approach helps in making the system dynamic as addition or removal of nodes doesn’t cause complete rehash. Only portion of request or data is rehashed which are very small , n : number of servers.
Note: There is possibility in above diagram that majority of keys fall into same segment leading to overload on same node. It happens when number of nodes are very less.
The solution to above problem is allocate virtual nodes across the ring with probabilistic distribution by replicating the service.
Implementation:
To achieve fast lookup of target Node Index of any request Balanced BST is used which stores index of all virtual nodes across the ring. As Balanced BST provides $O(logN)$ time for ‘peek’, ‘add’ and ‘remove’ it is preferred.
Let’s take an example, we have a service with multiple instances of it available and now we want to provide a load balancing capability to this service such that single node doesn’t face high traffic.
Steps to follow:
-
Assign each service instance to a hash ring.
- Hash the instance ID (e.g.,
IP:Port
orInstance Name
) onto the ring. - Example:
hash("S1")
,hash("S2")
,hash("S3")
→ positions on the ring.
- Hash the instance ID (e.g.,
-
Use Virtual Nodes (to evenly distribute load).
- Instead of hashing each instance once, create multiple virtual nodes per instance.
- Example:
hash("S1#1"), hash("S1#2"), hash("S1#3")
to spreadS1
across the ring.
-
Route incoming requests based on hash.
- Hash the request (e.g.,
hash(clientIP)
,hash(sessionID)
, orhash(userID)
). - Find the next clockwise instance on the ring.
- Hash the request (e.g.,
-
When an instance is added/removed, only a small portion of traffic is re-routed.
- If
S2
is removed, its traffic is shifted to the next closest node, instead of redistributing everything.
- If
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.*;
public class ConsistentHashingLoadBalancer {
private final SortedMap<Integer, String> ring = new TreeMap<>();
private final int virtualNodes;
public ConsistentHashingLoadBalancer(List<String> serviceInstances, int virtualNodes) {
this.virtualNodes = virtualNodes;
for (String instance : serviceInstances) {
addInstance(instance);
}
}
// Hash function (SHA-256)
private int hash(String key) {
try {
MessageDigest md = MessageDigest.getInstance("SHA-256");
byte[] digest = md.digest(key.getBytes(StandardCharsets.UTF_8));
return Math.abs(ByteBuffer.wrap(digest).getInt());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// Add an instance with virtual nodes
public void addInstance(String instance) {
for (int i = 0; i < virtualNodes; i++) {
int hash = hash(instance + "#" + i);
ring.put(hash, instance);
}
}
// Remove an instance and its virtual nodes
public void removeInstance(String instance) {
for (int i = 0; i < virtualNodes; i++) {
int hash = hash(instance + "#" + i);
ring.remove(hash);
}
}
// Get the instance for a given request
public String getInstance(String key) {
if (ring.isEmpty()) return null;
int hash = hash(key);
SortedMap<Integer, String> tailMap = ring.tailMap(hash);
Integer targetHash = tailMap.isEmpty() ? ring.firstKey() : tailMap.firstKey();
return ring.get(targetHash);
}
public static void main(String[] args) {
List<String> instances = Arrays.asList("S1", "S2", "S3");
ConsistentHashingLoadBalancer lb = new ConsistentHashingLoadBalancer(instances, 3);
String request1 = "client1";
String request2 = "client2";
System.out.println("Request1 routed to: " + lb.getInstance(request1));
System.out.println("Request2 routed to: " + lb.getInstance(request2));
// Simulate instance failure
lb.removeInstance("S2");
System.out.println("After S2 removal, Request1 routed to: " + lb.getInstance(request1));
}
}
The solution above is good enough for small systems but to scale it further enhancement is needed as it suffers from SPOF (single point of failure). ZooKeeper works in similar fashion as per above implementation, it also suffers with SPOF.
SPOF can be avoided by replicating these and each instance will maintain a snapshot of the index Tree which can be synced through Gossip Protocol. To check the difference in state between peers, Merkle Tree can be used.
Assuming gateway only delegates request to Load Balancer and does not routing capability of its own, once load balancer gets the request it looks up the index of next available server on the ring and route the request to that service.
Implementation Using Gossip Protocol:
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.*;
public class GossipConsistentHashing {
private final SortedMap<Integer, String> ring = new TreeMap<>();
private final int virtualNodes;
private final Random random = new Random();
public GossipConsistentHashing(List<String> initialInstances, int virtualNodes) {
this.virtualNodes = virtualNodes;
for (String instance : initialInstances) {
addInstance(instance);
}
}
// Hash function (SHA-256 for better distribution)
private int hash(String key) {
try {
MessageDigest md = MessageDigest.getInstance("SHA-256");
byte[] digest = md.digest(key.getBytes(StandardCharsets.UTF_8));
return Math.abs(Arrays.hashCode(digest));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// Add instance with virtual nodes
public synchronized void addInstance(String instance) {
for (int i = 0; i < virtualNodes; i++) {
int hash = hash(instance + "#" + i);
ring.put(hash, instance);
}
}
// Remove instance
public synchronized void removeInstance(String instance) {
for (int i = 0; i < virtualNodes; i++) {
int hash = hash(instance + "#" + i);
ring.remove(hash);
}
}
// Get responsible instance for a given request
public synchronized String getInstance(String key) {
if (ring.isEmpty()) return null;
int hash = hash(key);
SortedMap<Integer, String> tailMap = ring.tailMap(hash);
Integer targetHash = tailMap.isEmpty() ? ring.firstKey() : tailMap.firstKey();
return ring.get(targetHash);
}
// Gossip Protocol: Exchange Hash Ring State with Other Nodes
public synchronized void gossipExchange(GossipConsistentHashing peer) {
SortedMap<Integer, String> peerRing = peer.getRingSnapshot();
for (Map.Entry<Integer, String> entry : peerRing.entrySet()) {
ring.putIfAbsent(entry.getKey(), entry.getValue());
}
}
// Get a copy of the current hash ring state
public synchronized SortedMap<Integer, String> getRingSnapshot() {
return new TreeMap<>(ring);
}
public static void main(String[] args) {
List<String> instances = Arrays.asList("S1", "S2", "S3");
GossipConsistentHashing node1 = new GossipConsistentHashing(instances, 3);
GossipConsistentHashing node2 = new GossipConsistentHashing(instances, 3);
GossipConsistentHashing node3 = new GossipConsistentHashing(instances, 3);
// Simulate Gossip between nodes
node1.gossipExchange(node2);
node2.gossipExchange(node3);
node3.gossipExchange(node1);
System.out.println("Node 1 routing clientA to: " + node1.getInstance("clientA"));
System.out.println("Node 2 routing clientA to: " + node2.getInstance("clientA"));
System.out.println("Node 3 routing clientA to: " + node3.getInstance("clientA"));
// Simulate instance failure
node1.removeInstance("S2");
// More gossiping to propagate changes
node1.gossipExchange(node2);
node2.gossipExchange(node3);
node3.gossipExchange(node1);
System.out.println("After S2 removal, Node 1 routing clientA to: " + node1.getInstance("clientA"));
System.out.println("After S2 removal, Node 2 routing clientA to: " + node2.getInstance("clientA"));
}
}
This article is as per my current undrstanding, however i would love to know what you think ✌️
Top comments (0)