Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.examples;

import io.nats.client.Connection;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.synadia.chaos.ChaosArguments;
import io.synadia.chaos.ChaosRunner;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;

import static io.synadia.chaos.ChaosUtils.out;

public class ChaosRunnerSpecificPortExample {
private static final int SPECIFIC_PORT = 4222;
private static final int SERVER_COUNT = 3; // 1, 3, 5
private static final long DELAY = 3000; // the delay to bring a server down
private static final long INITIAL_DELAY = 3000; // the delay to bring a server down the first time
private static final long DOWN_TIME = 3000; // how long before bringing the server up
private static final int HEALTH_CHECK_DELAY = 1000;

private static final int NUM_CONNECTIONS = 5;

public static void main(String[] args) throws Exception {
ChaosArguments arguments = new ChaosArguments()
.servers(SERVER_COUNT)
.specificPort(SPECIFIC_PORT)
.workDirectory("C:\\temp\\chaos-runner")
.serverNamePrefix("cr-example-server")
.clusterName("cr-example-cluster")
.delay(DELAY)
.initialDelay(INITIAL_DELAY)
.downTime(DOWN_TIME)
// this is done last so anything on the command line
// is used over the hard coded items.
.args(args);

ChaosRunner runner = ChaosRunner.start(arguments);

// just give the servers a little time to be ready be first connect
Thread.sleep(1000);

String[] urls = runner.getConnectionUrls();
out("Connection Urls");
for (String url : urls) {
out(" ", url);
}

@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
List<Connection> connections = new ArrayList<>(urls.length);
for (int i = 0; i < NUM_CONNECTIONS; i++) {
String connectionName = "Conn" + (i + 1);
Options options = Options.builder().servers(urls)
.connectionListener(new ChaosConnectionListener(connectionName))
.errorListener(new ChaosErrorListener(connectionName))
.build();

Connection connection = Nats.connect(options);
connections.add(connection);
}

int[] ports = runner.getConnectionPorts();
int[] monitorPorts = runner.getMonitorPorts();
boolean hasMonitor = monitorPorts[0] > 0;

String[] hzs = new String[ports.length];
while (true) {
Thread.sleep(HEALTH_CHECK_DELAY);
if (hasMonitor) {
boolean changed = false;
for (int i = 0; i < monitorPorts.length; i++) {
String hz = readHealthz(monitorPorts[i]);
if (!hz.equals(hzs[i])) {
changed = true;
hzs[i] = hz;
}
}
if (changed) {
out("HealthZ");
for (int i = 0; i < monitorPorts.length; i++) {
int port = ports[i];
int mport = monitorPorts[i];
out(" ", port + "/" + mport, hzs[i]);
}
}
}
}
}

private static String readHealthz(int port) {
return readEndpoint(port, "healthz");
}

private static String readEndpoint(int port, String endpoint) {
String sUrl = "http://localhost:" + port + "/" + endpoint;
try {
URL url = new URL(sUrl);
InputStream inputStream = url.openStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));

boolean first = true;
String line;
StringBuilder content = new StringBuilder();
while ((line = reader.readLine()) != null) {
if (first) {
first = false;
}
else {
content.append(System.lineSeparator());
}
content.append(line);
}
reader.close();
return content.toString().trim();
}
catch (IOException e) {
return e.getMessage();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class ChaosArguments {
long delay = 5_000;
long downTime = 5_000;
boolean random = false;
int specificPort = -1;
int port = 4222;
int listen = 4232;
int monitor = 4282;
Expand Down Expand Up @@ -92,6 +93,11 @@ public ChaosArguments port(int port) {
return this;
}

public ChaosArguments specificPort(int port) {
this.specificPort = port;
return this;
}

public ChaosArguments listen(int listen) {
this.listen = listen;
return this;
Expand Down Expand Up @@ -138,6 +144,9 @@ public ChaosArguments args(String[] args) {
case "--port":
port(Integer.parseInt(args[++x]));
break;
case "--sport":
specificPort(Integer.parseInt(args[++x]));
break;
case "--listen":
listen(Integer.parseInt(args[++x]));
break;
Expand Down
32 changes: 29 additions & 3 deletions chaos-runner/src/main/java/io/synadia/chaos/ChaosRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ChaosRunner {
public final long delay;
public final long downTime;
public final boolean random;
public final int specificPort;
public final int port;
public final int listen;
public final int monitor;
Expand Down Expand Up @@ -101,15 +102,23 @@ private void scheduleUp() {

private void downTask() {
try {
if (random) {
if (specificPort != -1) {
for (int i = 0; i < natsServerRunners.size(); i++) {
NatsServerRunner nsr = natsServerRunners.get(i);
if (nsr.getPort() == specificPort) {
downIx = i;
break;
}
}
}
else if (random) {
downIx = ThreadLocalRandom.current().nextInt(servers);
}

NatsServerRunner runner = natsServerRunners.remove(downIx);
printer.out(CR_LABEL, "DOWN", runner.getPort());
printer.out(CR_LABEL, "DOWN", runner.getPort());
clusterInserts.add(clusterInserts.remove(downIx));
runner.close();

scheduleUp();
}
catch (Throwable e) {
Expand Down Expand Up @@ -179,12 +188,16 @@ else if (!a.workDirectory.toFile().exists()) {
this.delay = a.delay;
this.downTime = a.downTime;
this.random = a.random;
this.specificPort = a.specificPort;
this.port = a.port;
this.listen = a.listen;
this.monitor = a.monitor;

natsServerRunners = new ArrayList<>();
if (servers == 1) {
if (specificPort != -1 && specificPort != port) {
throw new IllegalArgumentException("Invalid specific port");
}
List<String> inserts = new ArrayList<>();
ClusterNode cn;
Path jsStorePath = Paths.get(jsStoreDirBase.toString(), "" + port);
Expand Down Expand Up @@ -217,6 +230,19 @@ else if (!a.workDirectory.toFile().exists()) {
}
else {
List<ClusterNode> cns = createNodes(servers, clusterName, serverNamePrefix, jsStoreDirBase, DEFAULT_HOST, port, listen, monitor < 1 ? null : monitor);
if (specificPort != -1) {
boolean found = false;
for (ClusterNode cn : cns) {
if (cn.port == specificPort) {
found = true;
break;
}
}
if (!found) {
throw new IllegalArgumentException("Invalid specific port");
}
}

clusterInserts = createClusterInserts(cns);
}

Expand Down