Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chaos-runner/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ repositories {

dependencies {
implementation 'io.nats:jnats:2.21.1'
implementation 'io.nats:jnats-server-runner:2.0.2-SNAPSHOT'

testImplementation 'commons-codec:commons-codec:1.18.0'
testImplementation 'io.nats:jnats-server-runner:1.2.8'
Expand Down
1 change: 1 addition & 0 deletions chaos-runner/chaos.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
java -cp "build/libs/*" io.synadia.tools.ChaosRunner %1 %2 %3 %4 %5 %6 %7 %8 %9
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.examples;

import io.nats.client.Connection;
import io.nats.client.ConnectionListener;

import static io.synadia.chaos.ChaosUtils.report;

public class ChaosConnectionListener implements ConnectionListener {
private final String reportLabel;

public ChaosConnectionListener(String connectionName) {
this.reportLabel = "CL/" + connectionName;
}

@Override
public void connectionEvent(Connection conn, Events type) {
report(reportLabel, type);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.examples;

import io.nats.client.Connection;
import io.nats.client.Consumer;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.impl.ErrorListenerConsoleImpl;
import io.nats.client.support.Status;

import static io.synadia.chaos.ChaosUtils.report;

public class ChaosErrorListener extends ErrorListenerConsoleImpl {
private final String reportLabel;

public ChaosErrorListener(String connectionName) {
this.reportLabel = "EL/" + connectionName;
}

@Override
public void errorOccurred(Connection conn, String error) {
report(reportLabel, supplyMessage("[SEVERE] errorOccurred", conn, null, null, "Error: ", error));
}

@Override
public void exceptionOccurred(Connection conn, Exception exp) {
report(reportLabel, supplyMessage("[SEVERE] exceptionOccurred", conn, null, null, "Exception: ", exp));
}

@Override
public void slowConsumerDetected(Connection conn, Consumer consumer) {
}

@Override
public void messageDiscarded(Connection conn, Message msg) {
}

@Override
public void heartbeatAlarm(Connection conn, JetStreamSubscription sub, long lastStreamSequence, long lastConsumerSequence) {
report(reportLabel, supplyMessage("[SEVERE] heartbeatAlarm", conn, null, sub, "lastStreamSequence: ", lastStreamSequence, "lastConsumerSequence: ", lastConsumerSequence));
}

@Override
public void unhandledStatus(Connection conn, JetStreamSubscription sub, Status status) {
}

@Override
public void pullStatusWarning(Connection conn, JetStreamSubscription sub, Status status) {
}

@Override
public void pullStatusError(Connection conn, JetStreamSubscription sub, Status status) {
}

@Override
public void flowControlProcessed(Connection conn, JetStreamSubscription sub, String subject, FlowControlSource source) {
}

@Override
public void socketWriteTimeout(Connection conn) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.examples;

import io.nats.client.Connection;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.synadia.chaos.ChaosRunner;
import io.synadia.chaos.ChaosStarter;

import java.util.ArrayList;
import java.util.List;

import static io.synadia.chaos.ChaosUtils.report;

public class ChaosRunnerExample {
static final int NUM_CONNECTIONS = 2;
static final int SERVER_COUNT = 1; // try 1, 3, 5
static final long DELAY = 5000; // the delay to bring a server down
static final long DOWN_TIME = 5000; // how long before bringing the server up
static final long STAY_ALIVE = 30_000; // how long to run the example program

public static void main(String[] args) throws Exception {
ChaosRunner runner = new ChaosStarter()
.count(SERVER_COUNT)
.delay(DELAY)
.downTime(DOWN_TIME)
.start();

// just give the servers a little time to be ready be first connect
Thread.sleep(1000);

String[] urls = runner.getConnectionUrls();
List<Connection> conns = new ArrayList<>(urls.length);
report("EXAMPLE", "Connection Urls:");
for (String url : urls) {
report("EXAMPLE", " " + url);
}


for (int i = 0; i < NUM_CONNECTIONS; i++) {
String cn = "CONN/" + i;
Options options = Options.builder().servers(urls)
.connectionListener(new ChaosConnectionListener(cn))
.errorListener(new ChaosErrorListener(cn))
.build();

Connection connection = Nats.connect(options);
conns.add(connection);
report("EXAMPLE", "Initial connection for " + cn, "Port: " + connection.getServerInfo().getPort());
}

Thread.sleep(STAY_ALIVE);
System.exit(0);
}
}
180 changes: 180 additions & 0 deletions chaos-runner/src/main/java/io/synadia/chaos/ChaosRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.chaos;

import io.nats.ClusterInsert;
import io.nats.ClusterNode;
import io.nats.NatsServerRunner;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import static io.nats.NatsRunnerUtils.*;
import static io.synadia.chaos.ChaosUtils.report;

public class ChaosRunner {

public final int count;
public final String clusterName;
public final String serverNamePrefix;
public final boolean js;
public final Path jsStoreDirBase;
public final long initialDelay;
public final long delay;
public final long downTime;
public final boolean random;

private final List<ClusterInsert> clusterInserts;
private final List<NatsServerRunner> natsServerRunners;
private final ScheduledThreadPoolExecutor executor;
private int downIx = 0;

public void shutdown() {
try {
for (NatsServerRunner runner : natsServerRunners ) {
try {
runner.close();
}
catch (Exception ignore) {}
}
}
catch (Exception ignore) {}
}

public int[] getConnectionPorts() {
int[] ports = new int[count];
for (int ix = 0; ix < count; ix++) {
ports[ix] = clusterInserts.get(ix).node.port;
}
return ports;
}

public String[] getConnectionUrls() {
String[] urls = new String[count];
for (int ix = 0; ix < count; ix++) {
urls[ix] = getNatsLocalhostUri(clusterInserts.get(ix).node.port);
}
return urls;
}

public ChaosRunner(int count, String clusterName, String serverNamePrefix, boolean js, Path jsStoreDirBase, long initialDelay, long delay, long downTime, boolean random) throws IOException {
this.count = count;
this.clusterName = clusterName;
this.serverNamePrefix = serverNamePrefix;
this.js = js;
this.jsStoreDirBase = jsStoreDirBase;
this.initialDelay = initialDelay;
this.delay = delay;
this.downTime = downTime;
this.random = random;

// delete jsStoreDirs for clean start
List<ClusterNode> nodes = createNodes(count, clusterName, serverNamePrefix, false, jsStoreDirBase);
clusterInserts = createClusterInserts(nodes);
for (ClusterInsert ci : clusterInserts) {
deleteDirContents(ci.node.jsStoreDir, false);
}

// start runner
natsServerRunners = new ArrayList<>();
for (int i = 0; i < count; i++) {
try {
natsServerRunners.add(createRunner(i));
}
catch (Exception e) {
throw new IllegalStateException("Exception Creating Runner", e);
}
}

executor = new ScheduledThreadPoolExecutor(2);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
executor.setRemoveOnCancelPolicy(true);

scheduleDown();
}

private NatsServerRunner createRunner(int index) throws Exception {
ClusterInsert ci = clusterInserts.get(index);
NatsServerRunner.Builder b = NatsServerRunner.builder()
.debug(false)
.jetstream(true)
.configInserts(ci.configInserts)
.port(ci.node.port)
.connectCheckTries(0)
;
return b.build();
}

private void scheduleDown() {
executor.schedule(this::downTask, delay, TimeUnit.MILLISECONDS);
}

private void scheduleUp() {
executor.schedule(this::upTask, downTime, TimeUnit.MILLISECONDS);
}

private void downTask() {
try {
if (random) {
downIx = ThreadLocalRandom.current().nextInt(count);
}

NatsServerRunner runner = natsServerRunners.remove(downIx);
report("DOWN", runner.getPort());
clusterInserts.add(clusterInserts.remove(downIx));
runner.close();

scheduleUp();
}
catch (Throwable e) {
report("DOWN/EX", e);
}
}

private void upTask() {
try {
NatsServerRunner runner = createRunner(count - 1);
report("UP", runner.getPort());
natsServerRunners.add(runner);
scheduleDown();
}
catch (Throwable e) {
report("UP/EX: ", e);
scheduleUp();
}
}

private static void deleteDirContents(Path dir, boolean alsoDeleteSelf) throws IOException {
File fDir = dir.toFile();
if (fDir.exists()) {
File[] items = fDir.listFiles();
if (items != null) {
for (File item : items) {
if (item.isDirectory()) {
deleteDirContents(item.toPath(), true);
}
else {
if (!item.delete()) {
throw new IllegalStateException("Failed to delete: " + item.getAbsolutePath());
}
}
}
}
if (alsoDeleteSelf && !fDir.delete()) {
throw new IllegalStateException("Failed to delete: " + fDir.getAbsolutePath());
}
}
}

@Override
public String toString() {
return ChaosUtils.toString(this, System.lineSeparator(), "", " ", "");
}
}
Loading