-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathNode.java
More file actions
133 lines (108 loc) · 5.6 KB
/
Node.java
File metadata and controls
133 lines (108 loc) · 5.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Node {
private static final String REGISTRY_QUEUE = "registry";
private static String id;
private static String corrId, corrId2;
private static int[] connections;
private static boolean go;
public static void main(String[] argv) throws Exception {
boolean[] created = { false };
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel send = connection.createChannel();
Channel recv = connection.createChannel();
String queueName = recv.queueDeclare().getQueue();
boolean durable = true;
send.queueDeclare(REGISTRY_QUEUE, durable, false, false, null);
id = "0";
// The Runtime.getRuntime()... section catches Ctrl+C and tells the
// Node_Registry the node is deleted/disconnected
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
// Tell the Node_Registry that the node is being deleted/disconnected
try {
if (!id.equals("0")) {
AMQP.BasicProperties deleteProps = new AMQP.BasicProperties.Builder().appId("delete_node")
.build();
send.basicPublish("", REGISTRY_QUEUE, deleteProps, id.getBytes());
}
System.out.println("Shutting down ...");
} catch (IOException e) {
e.printStackTrace();
}
}
});
System.out.println("Node is running...");
// Register the Node by sending a message to the Node_Registry
corrId = UUID.randomUUID().toString();
AMQP.BasicProperties newProps = new AMQP.BasicProperties.Builder().appId("new_node").correlationId(corrId)
.replyTo(queueName).build();
send.basicPublish("", REGISTRY_QUEUE, newProps, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// Receive the id of the node in the Node_Registry Linked List
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
id = new String(delivery.getBody(), "UTF-8");
System.out.println("Node registered with id: " + id);
// Create queues to receive messages from other Nodes and Node_Registry
recv.queueDeclare(id, durable, false, false, null);// This queue will receive messages from other nodes
created[0] = true;
// Request its connections with other nodes
corrId2 = UUID.randomUUID().toString();
AMQP.BasicProperties connectionProps = new AMQP.BasicProperties.Builder().appId("obtain_connections")
.correlationId(corrId2).replyTo(queueName).build();
send.basicPublish("", REGISTRY_QUEUE, connectionProps, id.getBytes());
// Receive connections with other nodes
} else if (delivery.getProperties().getCorrelationId().equals(corrId2)) {
String msg = new String(delivery.getBody(), "UTF-8");
String[] table_route = msg.split(":");
connections = new int[table_route.length];
System.out.println("Table Route");
for (int i = 0; i < table_route.length; i++) {
connections[i] = Integer.parseInt(table_route[i]);
System.out.print(connections[i] + "|");
}
System.out.println("");
try {
normalRun(recv, send, id, connections);
} catch (Exception e) {
System.out.println("error");
}
}
};
recv.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
private static void normalRun(Channel recv, Channel send, String id, int[] connections) throws Exception {
recv.queueDeclare(id, true, false, false, null);// This queue will receive messages from other nodes
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// Receive normal message
String message = new String(delivery.getBody(), "UTF-8");
Map<String, Object> headers = delivery.getProperties().getHeaders();
String srcNode = headers.get("srcNode").toString();
String destNode = headers.get("destNode").toString();
if (destNode.equals(id)) {
System.out.println("Message received from " + srcNode + " > " + message);
} else {
// Encapsulate destination node in message
AMQP.BasicProperties nextProps = new AMQP.BasicProperties.Builder().headers(headers).build();
// Lookup which node must send to in order to reach destNode and send
int nodeId = Integer.parseInt(destNode.substring(4));
int link = connections[nodeId - 1];
String nextNode = "node" + Integer.toString(link);
System.out.println("Redirecting message to " + nextNode);
// Send message to next node
send.basicPublish("", nextNode, nextProps, message.getBytes());
}
};
recv.basicConsume(id, true, deliverCallback, consumerTag -> {
});
}
}