-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathNameServer.java
More file actions
421 lines (374 loc) · 17.2 KB
/
NameServer.java
File metadata and controls
421 lines (374 loc) · 17.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.io.EOFException;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import exceptions.*;
import networkMessages.*;
public class NameServer {
private String uuid;
/**
* The TOTAL number of existence of each chunk, not the number of replicas to which you add a primary.
*/
private final static short replication = 3;
private final ConcurrentHashMap<String, FileMetadata> fileMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, FileMetadata> missingReplicas = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ServerStatus> dataServers = new ConcurrentHashMap<>();
private long totalFreeSpace = 0;
/**
* The next file ID to be used. File Id are never reused, this counter only increases
*/
private long currentFileIndex = 1;
private static final String STATE_FILE = "nameserver_state.dat";
public NameServer() {
loadState();
// Register shutdown hook to save state on exit
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
closeAllDataServerConnections();
saveState();
}));
}
private void closeAllDataServerConnections() {
for (ServerStatus server : dataServers.values()) {
server.closeConnection();
}
}
//Class used to store the state of the name server, when shutted down (so if everything is restated the same way, it is in the same state).
private static class NameServerState implements Serializable {
//@Serial
//private static final long serialVersionUID = 1L;
Map<String, FileMetadata> fileMap;
Map<String, FileMetadata> missingReplicas;
String uuid;
long currentFileIndex;
NameServerState(Map<String, FileMetadata> fileMap, Map<String, FileMetadata> missingReplicas,
String uuid, long currentFileIndex) {
this.fileMap = fileMap;
this.missingReplicas = missingReplicas;
this.uuid = uuid;
this.currentFileIndex = currentFileIndex;
}
}
private void saveState() {
try {
NameServerState state = new NameServerState(fileMap, missingReplicas, uuid, currentFileIndex);
try (ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(STATE_FILE))) {
oos.writeObject(state);
System.out.println("State saved to " + STATE_FILE);
}
} catch (IOException e) {
System.err.println("Failed to save state: " + e.getMessage());
e.printStackTrace();
}
}
private void loadState() {
Path statePath = Path.of(STATE_FILE);
if (!Files.exists(statePath)) {
// Generate UUID if no state file exists
this.uuid = UUID.randomUUID().toString();
System.out.println("No state file found. Starting fresh with UUID: " + uuid);
return;
}
try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(STATE_FILE))) {
NameServerState state = (NameServerState) ois.readObject();
this.uuid = state.uuid != null ? state.uuid : UUID.randomUUID().toString();
this.currentFileIndex = state.currentFileIndex;
if (state.fileMap != null) {
fileMap.putAll(state.fileMap);
}
if (state.missingReplicas != null) {
missingReplicas.putAll(state.missingReplicas);
}
System.out.println("State loaded from " + STATE_FILE);
System.out.println(" UUID: " + uuid);
System.out.println(" Files: " + fileMap.size());
System.out.println(" Missing replicas: " + missingReplicas.size());
System.out.println(" Current file index: " + currentFileIndex);
} catch (IOException | ClassNotFoundException e) {
System.err.println("Failed to load state: " + e.getMessage());
System.err.println("Starting fresh...");
this.uuid = UUID.randomUUID().toString();
}
}
private long newFile(String pathName) throws FileExists {
if (fileMap.putIfAbsent(pathName, new FileMetadata(currentFileIndex, pathName)) != null) {
throw new FileExists();
} else {
currentFileIndex++;
return currentFileIndex - 1;
}
}
/**
* Sends the chunk to the data servers, managing load balancing and replication etc.
* Write in the FileMetadata the location where the chunks have been stored
* @param file The fileMetadata objects of the file
* @param chunk The chunk of data to write.
*/
private void writeChunk(FileMetadata file, Chunk chunk) throws Exception {
List<ServerStatus> availableServers = new ArrayList<>(dataServers.values());
if (availableServers.isEmpty()) {
throw new Exception("No data servers available");
}
file.setChunkHash(chunk);
int nbCopies = 0;
int nbEssaye = 0;
ServerStatus[] serverArray = availableServers.toArray(new ServerStatus[0]);
while (nbEssaye < availableServers.size() && nbCopies < replication) {
ServerStatus tryServer = Quickselect.quickselect(serverArray, nbEssaye);
nbEssaye++;
try {
//TODO : paralléliser cet envoi
sendChunkToDataServer(tryServer, chunk);
nbCopies++;
file.addServerToChunk(chunk.chunkID, tryServer.uuid);
} catch (RuntimeException e) {
e.printStackTrace();
//Ce serveur n'était manifestement pas disponible, essayons en un autre.
}
}
if (nbCopies == 0) {
throw new Exception("Could not write the file");
}
}
public void start(int port) throws IOException {
// Start periodic save timer (every 3 minutes)
Timer saveTimer = new Timer(true); // daemon thread
saveTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
saveState();
}
}, 180000, 180000); // 3 minutes = 180000 milliseconds
System.out.println("NameServer started with UUID: " + uuid);
System.out.println("Auto-save enabled (every 3 minutes)");
try (ServerSocket serverSocket = new ServerSocket(port)) {
while (true) {
Socket client = serverSocket.accept();
new Thread(() -> handleClient(client)).start();
}
} finally {
// Save state when server stops
saveTimer.cancel();
// Close all persistent connections to DataServers
closeAllDataServerConnections();
saveState();
}
}
private void handleClient(Socket client) {
boolean isADataServer = false;
ObjectOutputStream oos =null;
ObjectInputStream ois = null;
try {
oos = new ObjectOutputStream(client.getOutputStream());
ois = new ObjectInputStream(client.getInputStream());
Object obj;
while (true) {
try {
obj = ois.readObject();
} catch (EOFException e) {
// Client closed connection normally
System.out.println("Connection closed normally");
break;
}
if (obj instanceof CreateFileMessage msg) {
try {
newFile(msg.fileName);
oos.writeObject(new OkMessage());
oos.flush();
} catch (FileExists e) {
oos.writeObject(new ErrorMessage("File exists", msg.fileName));
oos.flush();
}
} else if (obj instanceof GetMetadataMessage msg) {
FileMetadata metadata = fileMap.get(msg.fileName);
if (metadata == null) {
oos.writeObject(new MetadataResponse(false, 0L));
} else {
oos.writeObject(new MetadataResponse(true, metadata.id));
}
oos.flush();
} else if (obj instanceof Chunk chunk) {
FileMetadata metadata = fileMap.get(chunk.fileName);
if (metadata == null) {
oos.writeObject(new ErrorMessage("Unknown file", chunk.fileName, chunk.chunkID));
oos.flush();
continue;
}
try {
writeChunk(metadata, chunk);
oos.writeObject(new OkMessage());
oos.flush();
} catch (Exception e) {
oos.writeObject(new ErrorMessage("Write failed: " + e.getMessage(), chunk.fileName, chunk.chunkID));
oos.flush();
}
} else if (obj instanceof EndOfFile) {//For future use
oos.writeObject(new OkMessage());
oos.flush();
break;
} else if (obj instanceof RegisterDataServerMessage msg) {
registerDataServer(msg, client.getRemoteSocketAddress().toString(), client, ois, oos);
updateLastSeen(msg.uuid);
isADataServer = true;
break;
} else if (obj instanceof ReadFile readFile) {
try {
readFile(readFile.pathName, readFile.offset, oos, ois);
} catch (FileNotFound e) {
oos.writeObject(new ErrorMessage("The file you requested does not exist.", readFile.pathName));
} catch (IOException e) {
oos.writeObject(new ErrorMessage("An internal network IO error occured, sorry"));
}
} else {
oos.writeObject(new ErrorMessage("Unknown message"));
oos.flush();
}
}
} catch (ClassNotFoundException e){
//This should not happen, but just in case.
System.err.println("A unknown message has been received, coming from something else. (Maybe different version even if there is only one)");
} catch (Exception e) {
System.err.println("Error handling client: " + e.getMessage());
e.printStackTrace();
} finally {
//If it is not a connection initialized by a dataserver, we close it.
if (!isADataServer){
try {
ois.close();
} catch (IOException e) {
}
try {
oos.close();
} catch (IOException e) {
}
try {
client.close();
} catch (IOException e) {
}
}
}
}
private void registerDataServer(RegisterDataServerMessage msg, String remoteAddress, Socket socket, ObjectInputStream ois, ObjectOutputStream oos) throws IOException {
// Use IP from message (which is the DataServer's local IP) or fallback to remote address
String ipAddress = msg.ipAddress != null && !msg.ipAddress.isEmpty()
? msg.ipAddress
: extractIPFromAddress(remoteAddress);
ServerStatus status = new ServerStatus(ipAddress, msg.port, msg.uuid, msg.freeSpace, msg.occupiedSpace);
status.lastSeen = System.currentTimeMillis() / 1000; // Unix timestamp
dataServers.put(msg.uuid, status);
totalFreeSpace += msg.freeSpace;
status.setOis(ois);
status.setOos(oos);
status.setSocket(socket);
oos.writeObject(new OkMessage());
oos.flush();
System.out.println("Registered DataServer: " + msg.uuid + " at " + ipAddress + ":" + msg.port +
" (free: " + msg.freeSpace + ", occupied: " + msg.occupiedSpace + ")");
}
private void updateLastSeen(String uuid) {
ServerStatus status = dataServers.get(uuid);
if (status != null) {
status.lastSeen = System.currentTimeMillis() / 1000; // Unix timestamp
}
}
private String extractIPFromAddress(String address) {
// Extract IP from format like "/127.0.0.1:12345"
if (address.startsWith("/")) {
address = address.substring(1);
}
int colonIndex = address.indexOf(':');
if (colonIndex > 0) {
return address.substring(0, colonIndex);
}
return address;
}
private void sendChunkToDataServer(ServerStatus server, Chunk chunk) {
synchronized (server.getConnectionLock()) {
ObjectOutputStream oos = server.getOos();
ObjectInputStream ois = server.getOis();
try {
oos.writeObject(chunk);
oos.flush();
Object response = ois.readObject();
if (response instanceof OkMessage) {
updateLastSeen(server.uuid);
// Update server's free space (approximate - we don't know exact chunk size)
server.freeSpace -= chunk.getData().length;
server.occupiedSpace += chunk.getData().length;
} else if (response instanceof ErrorMessage error) {
throw new RuntimeException("DataServer " + server.uuid + " error: " + error.message);
} else {
throw new RuntimeException("Unexpected response from DataServer " + server.uuid);
}
} catch (Exception e) {
throw new RuntimeException("Failed to send chunk to DataServer " + server.uuid + ": " + e.getMessage(), e);
}
}
}
/**
*
* @param filePath
* @param offset The offset, in byte, to read from (0 means from the beginning)
*/
private void readFile(String filePath, long offset, ObjectOutputStream client_oos, ObjectInputStream client_ois) throws FileNotFound, IOException, ClassNotFoundException {
FileMetadata fileMetadata = fileMap.get(filePath);
if (fileMetadata == null) {
throw new FileNotFound();
//TODO : Notify the client of this.
}
final int startFromChunk = (int) (offset/ FileMetadata.ChunkSize);
List<List<String>> chunksLocations = fileMetadata.getChunksLocations(); // fileMap: entrée: file paths, sortie: file metadat
// number of chunks that have been read
for (int chunkRead = 0; chunkRead + startFromChunk < chunksLocations.size(); chunkRead++) {
// for each chunk > startFromChunk, communicate with DataServer of the
List<String> chunkLocations = chunksLocations.get(startFromChunk); // locations of the replicas of the chunk
int i = 0;
for (; i < chunkLocations.size(); i++) {
String dataServerId = chunkLocations.get(i);
ServerStatus dataServer = dataServers.get(dataServerId);
// Establish connection with server to request
synchronized (dataServer.getConnectionLock()) {
ObjectOutputStream oos = dataServer.getOos();
ObjectInputStream ois = dataServer.getOis();
oos.writeObject(new ReadChunk(fileMetadata.id, startFromChunk + chunkRead));
oos.flush();
Object dataServerResponse = ois.readObject();
if (dataServerResponse instanceof Chunk chunk) {
if (fileMetadata.validateChunk(chunk)) {
//Management of offset between chunks
if (chunkRead == 0 && offset % FileMetadata.ChunkSize != 0){
chunk = new Chunk(
chunk.fileID,
chunk.fileName,
chunk.chunkID,
Arrays.copyOfRange(chunk.getData(), (int) (offset % FileMetadata.ChunkSize), chunk.getData().length)
);
}
client_oos.writeObject(chunk);
client_oos.flush();
break;
}
}
}
}
if (i == chunkLocations.size()) {
throw new RuntimeException("Chunk not found on any server. This should not happen.");
}
}
client_oos.writeObject(new EndOfFile());
}
}