diff --git a/chaos-runner/src/examples/java/io/synadia/examples/ChaosRunnerSpecificPortExample.java b/chaos-runner/src/examples/java/io/synadia/examples/ChaosRunnerSpecificPortExample.java new file mode 100644 index 0000000..7a8a078 --- /dev/null +++ b/chaos-runner/src/examples/java/io/synadia/examples/ChaosRunnerSpecificPortExample.java @@ -0,0 +1,128 @@ +// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.examples; + +import io.nats.client.Connection; +import io.nats.client.Nats; +import io.nats.client.Options; +import io.synadia.chaos.ChaosArguments; +import io.synadia.chaos.ChaosRunner; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +import static io.synadia.chaos.ChaosUtils.out; + +public class ChaosRunnerSpecificPortExample { + private static final int SPECIFIC_PORT = 4222; + private static final int SERVER_COUNT = 3; // 1, 3, 5 + private static final long DELAY = 3000; // the delay to bring a server down + private static final long INITIAL_DELAY = 3000; // the delay to bring a server down the first time + private static final long DOWN_TIME = 3000; // how long before bringing the server up + private static final int HEALTH_CHECK_DELAY = 1000; + + private static final int NUM_CONNECTIONS = 5; + + public static void main(String[] args) throws Exception { + ChaosArguments arguments = new ChaosArguments() + .servers(SERVER_COUNT) + .specificPort(SPECIFIC_PORT) + .workDirectory("C:\\temp\\chaos-runner") + .serverNamePrefix("cr-example-server") + .clusterName("cr-example-cluster") + .delay(DELAY) + .initialDelay(INITIAL_DELAY) + .downTime(DOWN_TIME) + // this is done last so anything on the command line + // is used over the hard coded items. + .args(args); + + ChaosRunner runner = ChaosRunner.start(arguments); + + // just give the servers a little time to be ready be first connect + Thread.sleep(1000); + + String[] urls = runner.getConnectionUrls(); + out("Connection Urls"); + for (String url : urls) { + out(" ", url); + } + + @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") + List connections = new ArrayList<>(urls.length); + for (int i = 0; i < NUM_CONNECTIONS; i++) { + String connectionName = "Conn" + (i + 1); + Options options = Options.builder().servers(urls) + .connectionListener(new ChaosConnectionListener(connectionName)) + .errorListener(new ChaosErrorListener(connectionName)) + .build(); + + Connection connection = Nats.connect(options); + connections.add(connection); + } + + int[] ports = runner.getConnectionPorts(); + int[] monitorPorts = runner.getMonitorPorts(); + boolean hasMonitor = monitorPorts[0] > 0; + + String[] hzs = new String[ports.length]; + while (true) { + Thread.sleep(HEALTH_CHECK_DELAY); + if (hasMonitor) { + boolean changed = false; + for (int i = 0; i < monitorPorts.length; i++) { + String hz = readHealthz(monitorPorts[i]); + if (!hz.equals(hzs[i])) { + changed = true; + hzs[i] = hz; + } + } + if (changed) { + out("HealthZ"); + for (int i = 0; i < monitorPorts.length; i++) { + int port = ports[i]; + int mport = monitorPorts[i]; + out(" ", port + "/" + mport, hzs[i]); + } + } + } + } + } + + private static String readHealthz(int port) { + return readEndpoint(port, "healthz"); + } + + private static String readEndpoint(int port, String endpoint) { + String sUrl = "http://localhost:" + port + "/" + endpoint; + try { + URL url = new URL(sUrl); + InputStream inputStream = url.openStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + + boolean first = true; + String line; + StringBuilder content = new StringBuilder(); + while ((line = reader.readLine()) != null) { + if (first) { + first = false; + } + else { + content.append(System.lineSeparator()); + } + content.append(line); + } + reader.close(); + return content.toString().trim(); + } + catch (IOException e) { + return e.getMessage(); + } + } +} diff --git a/chaos-runner/src/main/java/io/synadia/chaos/ChaosArguments.java b/chaos-runner/src/main/java/io/synadia/chaos/ChaosArguments.java index 7b5e1fd..c2951dd 100644 --- a/chaos-runner/src/main/java/io/synadia/chaos/ChaosArguments.java +++ b/chaos-runner/src/main/java/io/synadia/chaos/ChaosArguments.java @@ -18,6 +18,7 @@ public class ChaosArguments { long delay = 5_000; long downTime = 5_000; boolean random = false; + int specificPort = -1; int port = 4222; int listen = 4232; int monitor = 4282; @@ -92,6 +93,11 @@ public ChaosArguments port(int port) { return this; } + public ChaosArguments specificPort(int port) { + this.specificPort = port; + return this; + } + public ChaosArguments listen(int listen) { this.listen = listen; return this; @@ -138,6 +144,9 @@ public ChaosArguments args(String[] args) { case "--port": port(Integer.parseInt(args[++x])); break; + case "--sport": + specificPort(Integer.parseInt(args[++x])); + break; case "--listen": listen(Integer.parseInt(args[++x])); break; diff --git a/chaos-runner/src/main/java/io/synadia/chaos/ChaosRunner.java b/chaos-runner/src/main/java/io/synadia/chaos/ChaosRunner.java index cc6eeab..dc382e2 100644 --- a/chaos-runner/src/main/java/io/synadia/chaos/ChaosRunner.java +++ b/chaos-runner/src/main/java/io/synadia/chaos/ChaosRunner.java @@ -37,6 +37,7 @@ public class ChaosRunner { public final long delay; public final long downTime; public final boolean random; + public final int specificPort; public final int port; public final int listen; public final int monitor; @@ -101,15 +102,23 @@ private void scheduleUp() { private void downTask() { try { - if (random) { + if (specificPort != -1) { + for (int i = 0; i < natsServerRunners.size(); i++) { + NatsServerRunner nsr = natsServerRunners.get(i); + if (nsr.getPort() == specificPort) { + downIx = i; + break; + } + } + } + else if (random) { downIx = ThreadLocalRandom.current().nextInt(servers); } NatsServerRunner runner = natsServerRunners.remove(downIx); - printer.out(CR_LABEL, "DOWN", runner.getPort()); + printer.out(CR_LABEL, "DOWN", runner.getPort()); clusterInserts.add(clusterInserts.remove(downIx)); runner.close(); - scheduleUp(); } catch (Throwable e) { @@ -179,12 +188,16 @@ else if (!a.workDirectory.toFile().exists()) { this.delay = a.delay; this.downTime = a.downTime; this.random = a.random; + this.specificPort = a.specificPort; this.port = a.port; this.listen = a.listen; this.monitor = a.monitor; natsServerRunners = new ArrayList<>(); if (servers == 1) { + if (specificPort != -1 && specificPort != port) { + throw new IllegalArgumentException("Invalid specific port"); + } List inserts = new ArrayList<>(); ClusterNode cn; Path jsStorePath = Paths.get(jsStoreDirBase.toString(), "" + port); @@ -217,6 +230,19 @@ else if (!a.workDirectory.toFile().exists()) { } else { List cns = createNodes(servers, clusterName, serverNamePrefix, jsStoreDirBase, DEFAULT_HOST, port, listen, monitor < 1 ? null : monitor); + if (specificPort != -1) { + boolean found = false; + for (ClusterNode cn : cns) { + if (cn.port == specificPort) { + found = true; + break; + } + } + if (!found) { + throw new IllegalArgumentException("Invalid specific port"); + } + } + clusterInserts = createClusterInserts(cns); }