From 8c7af0fd841b92a7bacff566a08c543f0b8cf2ec Mon Sep 17 00:00:00 2001 From: scottf Date: Mon, 7 Jul 2025 20:11:14 -0400 Subject: [PATCH] Chaos Runner Initial Form --- chaos-runner/build.gradle | 1 + chaos-runner/chaos.bat | 1 + .../examples/ChaosConnectionListener.java | 22 +++ .../synadia/examples/ChaosErrorListener.java | 64 +++++++ .../synadia/examples/ChaosRunnerExample.java | 57 ++++++ .../java/io/synadia/chaos/ChaosRunner.java | 180 ++++++++++++++++++ .../java/io/synadia/chaos/ChaosStarter.java | 180 ++++++++++++++++++ .../java/io/synadia/chaos/ChaosUtils.java | 44 +++++ 8 files changed, 549 insertions(+) create mode 100644 chaos-runner/chaos.bat create mode 100644 chaos-runner/src/examples/java/io/synadia/examples/ChaosConnectionListener.java create mode 100644 chaos-runner/src/examples/java/io/synadia/examples/ChaosErrorListener.java create mode 100644 chaos-runner/src/examples/java/io/synadia/examples/ChaosRunnerExample.java create mode 100644 chaos-runner/src/main/java/io/synadia/chaos/ChaosRunner.java create mode 100644 chaos-runner/src/main/java/io/synadia/chaos/ChaosStarter.java create mode 100644 chaos-runner/src/main/java/io/synadia/chaos/ChaosUtils.java diff --git a/chaos-runner/build.gradle b/chaos-runner/build.gradle index f920c38..9d6dc60 100644 --- a/chaos-runner/build.gradle +++ b/chaos-runner/build.gradle @@ -39,6 +39,7 @@ repositories { dependencies { implementation 'io.nats:jnats:2.21.1' + implementation 'io.nats:jnats-server-runner:2.0.2-SNAPSHOT' testImplementation 'commons-codec:commons-codec:1.18.0' testImplementation 'io.nats:jnats-server-runner:1.2.8' diff --git a/chaos-runner/chaos.bat b/chaos-runner/chaos.bat new file mode 100644 index 0000000..5ad66d0 --- /dev/null +++ b/chaos-runner/chaos.bat @@ -0,0 +1 @@ +java -cp "build/libs/*" io.synadia.tools.ChaosRunner %1 %2 %3 %4 %5 %6 %7 %8 %9 diff --git a/chaos-runner/src/examples/java/io/synadia/examples/ChaosConnectionListener.java b/chaos-runner/src/examples/java/io/synadia/examples/ChaosConnectionListener.java new file mode 100644 index 0000000..cbe926b --- /dev/null +++ b/chaos-runner/src/examples/java/io/synadia/examples/ChaosConnectionListener.java @@ -0,0 +1,22 @@ +// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.examples; + +import io.nats.client.Connection; +import io.nats.client.ConnectionListener; + +import static io.synadia.chaos.ChaosUtils.report; + +public class ChaosConnectionListener implements ConnectionListener { + private final String reportLabel; + + public ChaosConnectionListener(String connectionName) { + this.reportLabel = "CL/" + connectionName; + } + + @Override + public void connectionEvent(Connection conn, Events type) { + report(reportLabel, type); + } +} diff --git a/chaos-runner/src/examples/java/io/synadia/examples/ChaosErrorListener.java b/chaos-runner/src/examples/java/io/synadia/examples/ChaosErrorListener.java new file mode 100644 index 0000000..b76cf71 --- /dev/null +++ b/chaos-runner/src/examples/java/io/synadia/examples/ChaosErrorListener.java @@ -0,0 +1,64 @@ +// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.examples; + +import io.nats.client.Connection; +import io.nats.client.Consumer; +import io.nats.client.JetStreamSubscription; +import io.nats.client.Message; +import io.nats.client.impl.ErrorListenerConsoleImpl; +import io.nats.client.support.Status; + +import static io.synadia.chaos.ChaosUtils.report; + +public class ChaosErrorListener extends ErrorListenerConsoleImpl { + private final String reportLabel; + + public ChaosErrorListener(String connectionName) { + this.reportLabel = "EL/" + connectionName; + } + + @Override + public void errorOccurred(Connection conn, String error) { + report(reportLabel, supplyMessage("[SEVERE] errorOccurred", conn, null, null, "Error: ", error)); + } + + @Override + public void exceptionOccurred(Connection conn, Exception exp) { + report(reportLabel, supplyMessage("[SEVERE] exceptionOccurred", conn, null, null, "Exception: ", exp)); + } + + @Override + public void slowConsumerDetected(Connection conn, Consumer consumer) { + } + + @Override + public void messageDiscarded(Connection conn, Message msg) { + } + + @Override + public void heartbeatAlarm(Connection conn, JetStreamSubscription sub, long lastStreamSequence, long lastConsumerSequence) { + report(reportLabel, supplyMessage("[SEVERE] heartbeatAlarm", conn, null, sub, "lastStreamSequence: ", lastStreamSequence, "lastConsumerSequence: ", lastConsumerSequence)); + } + + @Override + public void unhandledStatus(Connection conn, JetStreamSubscription sub, Status status) { + } + + @Override + public void pullStatusWarning(Connection conn, JetStreamSubscription sub, Status status) { + } + + @Override + public void pullStatusError(Connection conn, JetStreamSubscription sub, Status status) { + } + + @Override + public void flowControlProcessed(Connection conn, JetStreamSubscription sub, String subject, FlowControlSource source) { + } + + @Override + public void socketWriteTimeout(Connection conn) { + } +} diff --git a/chaos-runner/src/examples/java/io/synadia/examples/ChaosRunnerExample.java b/chaos-runner/src/examples/java/io/synadia/examples/ChaosRunnerExample.java new file mode 100644 index 0000000..9c6efcb --- /dev/null +++ b/chaos-runner/src/examples/java/io/synadia/examples/ChaosRunnerExample.java @@ -0,0 +1,57 @@ +// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.examples; + +import io.nats.client.Connection; +import io.nats.client.Nats; +import io.nats.client.Options; +import io.synadia.chaos.ChaosRunner; +import io.synadia.chaos.ChaosStarter; + +import java.util.ArrayList; +import java.util.List; + +import static io.synadia.chaos.ChaosUtils.report; + +public class ChaosRunnerExample { + static final int NUM_CONNECTIONS = 2; + static final int SERVER_COUNT = 1; // try 1, 3, 5 + static final long DELAY = 5000; // the delay to bring a server down + static final long DOWN_TIME = 5000; // how long before bringing the server up + static final long STAY_ALIVE = 30_000; // how long to run the example program + + public static void main(String[] args) throws Exception { + ChaosRunner runner = new ChaosStarter() + .count(SERVER_COUNT) + .delay(DELAY) + .downTime(DOWN_TIME) + .start(); + + // just give the servers a little time to be ready be first connect + Thread.sleep(1000); + + String[] urls = runner.getConnectionUrls(); + List conns = new ArrayList<>(urls.length); + report("EXAMPLE", "Connection Urls:"); + for (String url : urls) { + report("EXAMPLE", " " + url); + } + + + for (int i = 0; i < NUM_CONNECTIONS; i++) { + String cn = "CONN/" + i; + Options options = Options.builder().servers(urls) + .connectionListener(new ChaosConnectionListener(cn)) + .errorListener(new ChaosErrorListener(cn)) + .build(); + + Connection connection = Nats.connect(options); + conns.add(connection); + report("EXAMPLE", "Initial connection for " + cn, "Port: " + connection.getServerInfo().getPort()); + } + + Thread.sleep(STAY_ALIVE); + System.exit(0); + } +} diff --git a/chaos-runner/src/main/java/io/synadia/chaos/ChaosRunner.java b/chaos-runner/src/main/java/io/synadia/chaos/ChaosRunner.java new file mode 100644 index 0000000..c681e0d --- /dev/null +++ b/chaos-runner/src/main/java/io/synadia/chaos/ChaosRunner.java @@ -0,0 +1,180 @@ +// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.chaos; + +import io.nats.ClusterInsert; +import io.nats.ClusterNode; +import io.nats.NatsServerRunner; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import static io.nats.NatsRunnerUtils.*; +import static io.synadia.chaos.ChaosUtils.report; + +public class ChaosRunner { + + public final int count; + public final String clusterName; + public final String serverNamePrefix; + public final boolean js; + public final Path jsStoreDirBase; + public final long initialDelay; + public final long delay; + public final long downTime; + public final boolean random; + + private final List clusterInserts; + private final List natsServerRunners; + private final ScheduledThreadPoolExecutor executor; + private int downIx = 0; + + public void shutdown() { + try { + for (NatsServerRunner runner : natsServerRunners ) { + try { + runner.close(); + } + catch (Exception ignore) {} + } + } + catch (Exception ignore) {} + } + + public int[] getConnectionPorts() { + int[] ports = new int[count]; + for (int ix = 0; ix < count; ix++) { + ports[ix] = clusterInserts.get(ix).node.port; + } + return ports; + } + + public String[] getConnectionUrls() { + String[] urls = new String[count]; + for (int ix = 0; ix < count; ix++) { + urls[ix] = getNatsLocalhostUri(clusterInserts.get(ix).node.port); + } + return urls; + } + + public ChaosRunner(int count, String clusterName, String serverNamePrefix, boolean js, Path jsStoreDirBase, long initialDelay, long delay, long downTime, boolean random) throws IOException { + this.count = count; + this.clusterName = clusterName; + this.serverNamePrefix = serverNamePrefix; + this.js = js; + this.jsStoreDirBase = jsStoreDirBase; + this.initialDelay = initialDelay; + this.delay = delay; + this.downTime = downTime; + this.random = random; + + // delete jsStoreDirs for clean start + List nodes = createNodes(count, clusterName, serverNamePrefix, false, jsStoreDirBase); + clusterInserts = createClusterInserts(nodes); + for (ClusterInsert ci : clusterInserts) { + deleteDirContents(ci.node.jsStoreDir, false); + } + + // start runner + natsServerRunners = new ArrayList<>(); + for (int i = 0; i < count; i++) { + try { + natsServerRunners.add(createRunner(i)); + } + catch (Exception e) { + throw new IllegalStateException("Exception Creating Runner", e); + } + } + + executor = new ScheduledThreadPoolExecutor(2); + executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + executor.setRemoveOnCancelPolicy(true); + + scheduleDown(); + } + + private NatsServerRunner createRunner(int index) throws Exception { + ClusterInsert ci = clusterInserts.get(index); + NatsServerRunner.Builder b = NatsServerRunner.builder() + .debug(false) + .jetstream(true) + .configInserts(ci.configInserts) + .port(ci.node.port) + .connectCheckTries(0) + ; + return b.build(); + } + + private void scheduleDown() { + executor.schedule(this::downTask, delay, TimeUnit.MILLISECONDS); + } + + private void scheduleUp() { + executor.schedule(this::upTask, downTime, TimeUnit.MILLISECONDS); + } + + private void downTask() { + try { + if (random) { + downIx = ThreadLocalRandom.current().nextInt(count); + } + + NatsServerRunner runner = natsServerRunners.remove(downIx); + report("DOWN", runner.getPort()); + clusterInserts.add(clusterInserts.remove(downIx)); + runner.close(); + + scheduleUp(); + } + catch (Throwable e) { + report("DOWN/EX", e); + } + } + + private void upTask() { + try { + NatsServerRunner runner = createRunner(count - 1); + report("UP", runner.getPort()); + natsServerRunners.add(runner); + scheduleDown(); + } + catch (Throwable e) { + report("UP/EX: ", e); + scheduleUp(); + } + } + + private static void deleteDirContents(Path dir, boolean alsoDeleteSelf) throws IOException { + File fDir = dir.toFile(); + if (fDir.exists()) { + File[] items = fDir.listFiles(); + if (items != null) { + for (File item : items) { + if (item.isDirectory()) { + deleteDirContents(item.toPath(), true); + } + else { + if (!item.delete()) { + throw new IllegalStateException("Failed to delete: " + item.getAbsolutePath()); + } + } + } + } + if (alsoDeleteSelf && !fDir.delete()) { + throw new IllegalStateException("Failed to delete: " + fDir.getAbsolutePath()); + } + } + } + + @Override + public String toString() { + return ChaosUtils.toString(this, System.lineSeparator(), "", " ", ""); + } +} diff --git a/chaos-runner/src/main/java/io/synadia/chaos/ChaosStarter.java b/chaos-runner/src/main/java/io/synadia/chaos/ChaosStarter.java new file mode 100644 index 0000000..39424b4 --- /dev/null +++ b/chaos-runner/src/main/java/io/synadia/chaos/ChaosStarter.java @@ -0,0 +1,180 @@ +package io.synadia.chaos; + +import io.nats.NatsServerRunner; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.logging.Level; + +import static io.nats.NatsRunnerUtils.*; +import static io.synadia.chaos.ChaosUtils.report; + +public class ChaosStarter { + + int count = 3; + String clusterName = DEFAULT_CLUSTER_NAME; + String serverNamePrefix = DEFAULT_SERVER_NAME_PREFIX; + boolean js = true; + Path workDirectory; + long initialDelay = 10_000; + long delay = 5_000; + long downTime = 5_000; + boolean random = false; + + private static ChaosRunner INSTANCE; + + public ChaosStarter count(int count) { + this.count = count; + return this; + } + + public ChaosStarter clusterName(String clusterName) { + this.clusterName = clusterName; + return this; + } + + public ChaosStarter serverNamePrefix(String serverNamePrefix) { + this.serverNamePrefix = serverNamePrefix; + return this; + } + + public ChaosStarter js(boolean js) { + this.js = js; + return this; + } + + public ChaosStarter workDirectory(String workDirectory) { + return workDirectory(Paths.get(workDirectory)); + } + + public ChaosStarter workDirectory(File workDirectory) { + return workDirectory(workDirectory.getPath()); + } + + public ChaosStarter workDirectory(Path workDirectory) { + this.workDirectory = workDirectory; + return this; + } + + public ChaosStarter initialDelay(long initialDelay) { + this.initialDelay = initialDelay; + return this; + } + + public ChaosStarter delay(long delay) { + this.delay = delay; + return this; + } + + public ChaosStarter downTime(long downTime) { + this.downTime = downTime; + return this; + } + + public ChaosStarter random(boolean random) { + this.random = random; + return this; + } + + public ChaosStarter args(String[] args) { + if (args != null && args.length > 0) { + try { + for (int x = 0; x < args.length; x++) { + String arg = args[x].trim(); + switch (arg) { + case "--count": + count(Integer.parseInt(args[++x])); + break; + case "--delay": + delay(Long.parseLong(args[++x])); + break; + case "--initial": + initialDelay(Long.parseLong(args[++x])); + break; + case "--down": + downTime(Long.parseLong(args[++x])); + break; + case "--cname": + clusterName(args[++x]); + break; + case "--prefix": + serverNamePrefix(args[++x]); + break; + case "--dir": + workDirectory(args[++x]); + break; + case "--js": + js(true); + break; + case "--nojs": + js(false); + break; + case "--random": + random(true); + break; + case "--norandom": + random(false); + break; + case "": + break; + default: + error("Unknown argument: " + arg); + break; + } + } + } + catch (Exception e) { + error("Exception while parsing, most likely missing an argument value."); + } + } + return this; + } + + public void error(String errMsg) { + System.err.println("ERROR: " + errMsg); + System.exit(-1); + } + + public ChaosRunner start() throws IOException { + NatsServerRunner.setDefaultOutputLevel(Level.SEVERE); + + if (INSTANCE == null) { + if (workDirectory == null) { + workDirectory = getTemporaryJetStreamStoreDirBase(); + } + else if (!workDirectory.toFile().exists()) { + throw new IllegalArgumentException("Work directory does not exist: " + workDirectory); + } + + Path jsStoreDirBase = js ? workDirectory : null; + + Runtime.getRuntime().addShutdownHook( + new Thread("app-shutdown-hook") { + @Override + public void run() { + INSTANCE.shutdown(); + report("EXIT Chaos Runner"); + } + }); + + INSTANCE = new ChaosRunner( + count, + clusterName, + serverNamePrefix, + js, + jsStoreDirBase, + initialDelay, + delay, + downTime, + random); + + report("STARTER", INSTANCE); + } + else { + report("STARTER", "Chaos Runner previously started.", INSTANCE); + } + return INSTANCE; + } +} diff --git a/chaos-runner/src/main/java/io/synadia/chaos/ChaosUtils.java b/chaos-runner/src/main/java/io/synadia/chaos/ChaosUtils.java new file mode 100644 index 0000000..f0ee0a3 --- /dev/null +++ b/chaos-runner/src/main/java/io/synadia/chaos/ChaosUtils.java @@ -0,0 +1,44 @@ +// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.chaos; + +public class ChaosUtils { + + public static String toString(ChaosRunner r) { + return toString(r, System.lineSeparator(), "", " ", ""); + } + + public static String toString(ChaosRunner r, String sep, String prefix, String indent, String outdent) { + String spi = sep + prefix + indent; + StringBuilder sb = new StringBuilder(prefix).append("Chaos Runner:"); + sb.append(spi).append("servers=").append(r.count).append(outdent) + .append(spi).append("js=").append(r.js).append(outdent); + if (r.count > 1) { + sb.append(spi).append("clusterName=").append(r.clusterName).append(outdent) + .append(spi).append("serverNamePrefix=").append(r.serverNamePrefix).append(outdent); + } + sb.append(spi).append("jsStoreDirBase=").append(r.jsStoreDirBase).append(outdent) + .append(spi).append("initialDelay=").append(r.initialDelay).append(outdent) + .append(spi).append("delay=").append(r.delay).append(outdent) + .append(spi).append("downTime=").append(r.downTime).append(outdent) + .append(spi).append("random=").append(r.random); + + return sb.toString(); + } + + public static void report(String label, Object... parts) { + String prefix = "[" + System.currentTimeMillis() + "] " + label; + StringBuilder sb = new StringBuilder(prefix); + for (Object part : parts) { + sb.append(" | "); + sb.append(part); + } + System.out.println(sb); + } + + public static void report(String label, ChaosRunner runner) { + String prefix = "[" + System.currentTimeMillis() + "] " + label + " | "; + System.out.println(toString(runner, System.lineSeparator(), prefix, " ", "")); + } +}