Skip to main content

Totally Ordered Multicast (TO-Multicast)

This is the algorithm that guarantees every replica node applies file-system writes in the exact same order. It's the core of our consistency model.


The Problem It Solves

Consider two clients, connected to different nodes, uploading files at nearly the same instant:

Client A → Node 0: upload("report.pdf", dataA)
Client B → Node 2: upload("data.csv", dataB)

Without total ordering, this is what could happen:

Node 0Node 1Node 2
Processing orderreport.pdfdata.csvdata.csvreport.pdfreport.pdfdata.csv

Nodes 0 and 2 agree, but Node 1 processes them in reverse order. The replicas have now diverged. A client reading from Node 1 sees a different file system state than a client reading from Node 0.

TO-Multicast prevents this by making every node deliver messages in the same sequence.

The Algorithm — Step by Step

Step 1 — Broadcast

The node that receives the client's write operation increments its Lamport clock, wraps the operation in a ClockMessage, and calls receiveMulticast() on every node including itself.

long ts = clock.tick();
ClockMessage msg = new ClockMessage(nodeId, ts, fileOperation);
pendingOperations.put(msg.getMessageId(), future);

for (ReplicaNodeInterface peer : peers) {
peer.receiveMulticast(msg);
}

Step 2 — Receive & ACK

Each node that receives the multicast:

  1. Updates its Lamport clock using clock.update(msg.timestamp)
  2. Enqueues the message in a priority queue sorted by (timestamp, senderId)
  3. Broadcasts an ACK to every node with its updated clock value
public void receiveMulticast(ClockMessage msg) {
long myClock = clock.update(msg.getTimestamp());
messageQueue.addMessage(msg);

for (ReplicaNodeInterface peer : peers) {
peer.sendAck(nodeId, msg.getMessageId(), myClock);
}
}

Step 3 — Delivery Condition

A background thread (the delivery loop, running every 50ms) checks:

Is message at HEAD of priority queue?
AND
Have ALL N nodes sent an ACK for this message?
AND
Is every ACK timestamp STRICTLY GREATER than the message timestamp?

Only when all three conditions are met is the message delivered.

// MessageQueue.getDeliverableMessages()
for (ClockMessage msg : queue) {
Map<Integer, Long> acks = ackMap.get(msg.getMessageId());
if (acks == null || acks.size() < totalNodes) continue; // not all ACKs yet

boolean allGreater = acks.values().stream()
.allMatch(ackTs -> ackTs > msg.getTimestamp());
if (!allGreater) continue; // some ACK's timestamp is ≤ msg.ts

deliverable.add(msg);
// Only return messages from the HEAD forward (total order)
}

Why Only the Head?

The priority queue is sorted by (timestamp, senderId). If we deliver a message that's NOT at the head, we've broken total ordering — a later-arriving message with smaller timestamp could still come in and should have been delivered first.

This is the mechanism that guarantees all nodes deliver messages in the same order.

Tie-Breaking

What if two nodes generate messages with the same timestamp?

Lamport clocks guarantee that if two events are concurrent (neither caused the other), they might have the same timestamp. Our queue sorts by (timestamp, senderId), breaking ties deterministically by node ID:

Message A: ts=5, senderId=0
Message B: ts=5, senderId=1
→ A is ordered before B (0 < 1)

Message C: ts=5, senderId=2
→ A < B < C (0 < 1 < 2)

The key: the tie-break is deterministic. Every node uses the same comparator, so every node produces the same ordering.

The Data Structures

MessageQueue

public class MessageQueue {
// Priority queue sorted by (timestamp, senderId)
private final PriorityQueue<ClockMessage> queue;

// messageId → (nodeId → ackTimestamp)
private final Map<String, Map<Integer, Long>> acks;

// Number of nodes in the cluster (knows when all ACKs are in)
private final int totalNodes;
}

ClockMessage

public class ClockMessage implements Serializable {
private final String messageId; // UUID — unique identifier
private final int senderId; // which node sent this
private final long timestamp; // Lamport clock value at send time
private final FileOperation operation; // the actual file operation
}

Delivery Loop

// Inside ReplicaNode — runs in a ScheduledExecutorService
deliveryExecutor.scheduleAtFixedRate(() -> {
List<ClockMessage> deliverable = messageQueue.getDeliverableMessages();
for (ClockMessage msg : deliverable) {
deliverMessage(msg);
}
}, 0, 50, TimeUnit.MILLISECONDS);

Why This Works — The Proof

  1. Every node receives every message (broadcast to all)
  2. Every node orders messages identically (same comparator: timestamp then senderId)
  3. Every node delivers in queue order (only at the head)
  4. ACKs with timestamps > msg.ts guarantee no earlier messages are in flight

Result: all nodes execute all writes in the same sequence → all replicas have identical file system state.


Next: Raft Leader Election

TO-Multicast handles write ordering. But who handles reads? And what happens when a node crashes? → Raft Leader Election