Node Layer — The Replica Server
The com.dfs.node package is where the distributed systems magic lives. ReplicaNode.java alone is ~750 lines and implements TO-Multicast, Raft, file I/O, and all security checks.
Class Overview
ReplicaNode — The Main Class
Field Groups
The fields are organized into logical groups:
public class ReplicaNode extends UnicastRemoteObject implements ReplicaNodeInterface {
// ---- Identity and storage ----
private final int nodeId;
private final String storageDir;
// ---- Cluster membership ----
private final NodeRegistry registry;
private final List<ReplicaNodeInterface> peers;
// ---- TO-Multicast components ----
private final LogicalClock clock;
private final MessageQueue messageQueue;
// ---- Security ----
private final NonceStore nonceStore;
// ---- Raft state ----
private final AtomicReference<RaftRole> role;
private final AtomicInteger currentTerm;
private volatile int votedFor;
// ...
}
Key Methods
handleClientOperation(FileOperation op, String token)
The entry point for all client requests. This method:
- Validates the session token via
authService.validateToken(token) - Validates the operation via
op.validate() - Checks nonce/timestamp via
nonceStore.isValid(op.getNonce(), op.getTimestamp())(writes only) - Routes reads to
executeLocally(op)if we're the leader, otherwise forwards to the leader - Routes writes to
broadcastAndWait(op)for TO-Multicast
@Override
public OperationResult handleClientOperation(FileOperation op, String sessionToken)
throws RemoteException {
// ---- Authentication check ----
if (!authService.validateToken(sessionToken)) {
return new OperationResult(false, "Unauthorized: invalid session token.");
}
// ---- Input validation ----
if (!op.validate()) {
return new OperationResult(false, "Invalid operation.");
}
// ---- Replay protection (writes only) ----
if (op.isWrite() && !nonceStore.isValid(op.getNonce(), op.getTimestamp())) {
return new OperationResult(false, "Replay attack detected.");
}
// ---- Route to read or write path ----
if (op.isWrite()) {
return broadcastAndWait(op);
} else {
// Forward reads to leader if we're not it
if (nodeId != currentLeader && currentLeader >= 0) {
return peers.get(currentLeader).handleClientOperation(op, sessionToken);
}
return executeLocally(op);
}
}
broadcastAndWait(FileOperation op)
The TO-Multicast entry point. Creates a ClockMessage, broadcasts it, and blocks until delivery:
private OperationResult broadcastAndWait(FileOperation op) throws RemoteException {
long ts = clock.tick();
ClockMessage msg = new ClockMessage(nodeId, ts, op);
CompletableFuture<OperationResult> future = new CompletableFuture<>();
pendingOperations.put(msg.getMessageId(), future);
// Send to ALL nodes (including self)
for (ReplicaNodeInterface peer : getAllPeers()) {
try { peer.receiveMulticast(msg); } catch (RemoteException e) { /* skip dead peers */ }
}
// Block until the delivery loop completes the future
try { return future.get(10, TimeUnit.SECONDS); }
catch (TimeoutException e) { return new OperationResult(false, "Operation timed out."); }
}
executeLocally(FileOperation op)
Performs the actual file-system operation. Called by the delivery loop (for writes) or directly (for leader reads):
private OperationResult executeLocally(FileOperation op) {
Path filePath = Paths.get(storageDir, op.getFilename());
switch (op.getOperationType()) {
case FileOperation.UPLOAD:
Files.write(filePath, op.getFileData());
return new OperationResult(true, "File uploaded successfully.");
case FileOperation.DOWNLOAD:
byte[] data = Files.readAllBytes(filePath);
return new OperationResult(true, "File downloaded.", data, null);
case FileOperation.DELETE:
Files.deleteIfExists(filePath);
return new OperationResult(true, "File deleted successfully.");
case FileOperation.RENAME:
Path newPath = Paths.get(storageDir, op.getNewFilename());
Files.move(filePath, newPath);
return new OperationResult(true, "File renamed successfully.");
case FileOperation.SEARCH:
// Walk storage directory, filter by keyword
List<String> matches = Files.list(Paths.get(storageDir))
.map(p -> p.getFileName().toString())
.filter(name -> name.contains(op.getFilename()))
.collect(Collectors.toList());
return new OperationResult(true, "Search complete.", null, matches);
case FileOperation.LIST:
List<String> files = Files.list(Paths.get(storageDir))
.map(p -> p.getFileName().toString())
.collect(Collectors.toList());
return new OperationResult(true, "List complete.", null, files);
}
}
MessageQueue — The TO-Multicast Algorithm
public class MessageQueue {
// Sorted by (timestamp, senderId) — deterministic ordering
private final PriorityQueue<ClockMessage> queue = new PriorityQueue<>(
Comparator.comparingLong(ClockMessage::getTimestamp)
.thenComparingInt(ClockMessage::getSenderId)
);
// messageId → (nodeId → ackTimestamp)
private final Map<String, Map<Integer, Long>> acks = new ConcurrentHashMap<>();
private final int totalNodes;
}
The getDeliverableMessages() method returns all messages at the front of the queue that have all ACKs with timestamps > msg.ts:
public List<ClockMessage> getDeliverableMessages() {
List<ClockMessage> result = new ArrayList<>();
for (ClockMessage msg : queue) {
Map<Integer, Long> msgAcks = acks.get(msg.getMessageId());
if (msgAcks == null || msgAcks.size() < totalNodes) break; // not enough ACKs
// Every ACK must be STRICTLY greater than the message timestamp
boolean allStrict = msgAcks.values().stream()
.allMatch(ackTs -> ackTs > msg.getTimestamp());
if (!allStrict) break;
result.add(msg);
}
return result;
}
Next: → Client & Auth Layers