Skip to content

Commit 2c60a5d

Browse files
committed
Merge remote-tracking branch 'origin/main'
2 parents f654051 + a19e9b7 commit 2c60a5d

File tree

4 files changed

+167
-4
lines changed

4 files changed

+167
-4
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ Eventually the ability to schedule a message to publish based on a cron or sched
113113
**Current Release**: 0.0.1
114114
  **Current Snapshot**: 0.0.2-SNAPSHOT
115115

116-
[![README](https://img.shields.io/badge/README-blue?style=flat&link=scheduled-message/README.md)](scheduled-message/README.md)
116+
[![README](https://img.shields.io/badge/README-blue?style=flat&link=schedule-message/README.md)](schedule-message/README.md)
117117
![Artifact](https://img.shields.io/badge/Artifact-io.synadia:scheduled--message-00BC8E?labelColor=grey&style=flat)
118118
[![javadoc](https://javadoc.io/badge2/io.synadia/scheduled-message/javadoc.svg)](https://javadoc.io/doc/io.synadia/scheduled-message)
119119
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.synadia/scheduled-message/badge.svg)](https://maven-badges.herokuapp.com/maven-central/io.synadia/scheduled-message)
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved.
2+
// See LICENSE and NOTICE file for details.
3+
4+
package io.synadia.examples;
5+
6+
import io.nats.client.Connection;
7+
import io.nats.client.Nats;
8+
import io.nats.client.Options;
9+
import io.synadia.chaos.ChaosArguments;
10+
import io.synadia.chaos.ChaosRunner;
11+
12+
import java.io.BufferedReader;
13+
import java.io.IOException;
14+
import java.io.InputStream;
15+
import java.io.InputStreamReader;
16+
import java.net.URL;
17+
import java.util.ArrayList;
18+
import java.util.List;
19+
20+
import static io.synadia.chaos.ChaosUtils.out;
21+
22+
public class ChaosRunnerSpecificPortExample {
23+
private static final int SPECIFIC_PORT = 4222;
24+
private static final int SERVER_COUNT = 3; // 1, 3, 5
25+
private static final long DELAY = 3000; // the delay to bring a server down
26+
private static final long INITIAL_DELAY = 3000; // the delay to bring a server down the first time
27+
private static final long DOWN_TIME = 3000; // how long before bringing the server up
28+
private static final int HEALTH_CHECK_DELAY = 1000;
29+
30+
private static final int NUM_CONNECTIONS = 5;
31+
32+
public static void main(String[] args) throws Exception {
33+
ChaosArguments arguments = new ChaosArguments()
34+
.servers(SERVER_COUNT)
35+
.specificPort(SPECIFIC_PORT)
36+
.workDirectory("C:\\temp\\chaos-runner")
37+
.serverNamePrefix("cr-example-server")
38+
.clusterName("cr-example-cluster")
39+
.delay(DELAY)
40+
.initialDelay(INITIAL_DELAY)
41+
.downTime(DOWN_TIME)
42+
// this is done last so anything on the command line
43+
// is used over the hard coded items.
44+
.args(args);
45+
46+
ChaosRunner runner = ChaosRunner.start(arguments);
47+
48+
// just give the servers a little time to be ready be first connect
49+
Thread.sleep(1000);
50+
51+
String[] urls = runner.getConnectionUrls();
52+
out("Connection Urls");
53+
for (String url : urls) {
54+
out(" ", url);
55+
}
56+
57+
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
58+
List<Connection> connections = new ArrayList<>(urls.length);
59+
for (int i = 0; i < NUM_CONNECTIONS; i++) {
60+
String connectionName = "Conn" + (i + 1);
61+
Options options = Options.builder().servers(urls)
62+
.connectionListener(new ChaosConnectionListener(connectionName))
63+
.errorListener(new ChaosErrorListener(connectionName))
64+
.build();
65+
66+
Connection connection = Nats.connect(options);
67+
connections.add(connection);
68+
}
69+
70+
int[] ports = runner.getConnectionPorts();
71+
int[] monitorPorts = runner.getMonitorPorts();
72+
boolean hasMonitor = monitorPorts[0] > 0;
73+
74+
String[] hzs = new String[ports.length];
75+
while (true) {
76+
Thread.sleep(HEALTH_CHECK_DELAY);
77+
if (hasMonitor) {
78+
boolean changed = false;
79+
for (int i = 0; i < monitorPorts.length; i++) {
80+
String hz = readHealthz(monitorPorts[i]);
81+
if (!hz.equals(hzs[i])) {
82+
changed = true;
83+
hzs[i] = hz;
84+
}
85+
}
86+
if (changed) {
87+
out("HealthZ");
88+
for (int i = 0; i < monitorPorts.length; i++) {
89+
int port = ports[i];
90+
int mport = monitorPorts[i];
91+
out(" ", port + "/" + mport, hzs[i]);
92+
}
93+
}
94+
}
95+
}
96+
}
97+
98+
private static String readHealthz(int port) {
99+
return readEndpoint(port, "healthz");
100+
}
101+
102+
private static String readEndpoint(int port, String endpoint) {
103+
String sUrl = "http://localhost:" + port + "/" + endpoint;
104+
try {
105+
URL url = new URL(sUrl);
106+
InputStream inputStream = url.openStream();
107+
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
108+
109+
boolean first = true;
110+
String line;
111+
StringBuilder content = new StringBuilder();
112+
while ((line = reader.readLine()) != null) {
113+
if (first) {
114+
first = false;
115+
}
116+
else {
117+
content.append(System.lineSeparator());
118+
}
119+
content.append(line);
120+
}
121+
reader.close();
122+
return content.toString().trim();
123+
}
124+
catch (IOException e) {
125+
return e.getMessage();
126+
}
127+
}
128+
}

chaos-runner/src/main/java/io/synadia/chaos/ChaosArguments.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public class ChaosArguments {
1818
long delay = 5_000;
1919
long downTime = 5_000;
2020
boolean random = false;
21+
int specificPort = -1;
2122
int port = 4222;
2223
int listen = 4232;
2324
int monitor = 4282;
@@ -92,6 +93,11 @@ public ChaosArguments port(int port) {
9293
return this;
9394
}
9495

96+
public ChaosArguments specificPort(int port) {
97+
this.specificPort = port;
98+
return this;
99+
}
100+
95101
public ChaosArguments listen(int listen) {
96102
this.listen = listen;
97103
return this;
@@ -138,6 +144,9 @@ public ChaosArguments args(String[] args) {
138144
case "--port":
139145
port(Integer.parseInt(args[++x]));
140146
break;
147+
case "--sport":
148+
specificPort(Integer.parseInt(args[++x]));
149+
break;
141150
case "--listen":
142151
listen(Integer.parseInt(args[++x]));
143152
break;

chaos-runner/src/main/java/io/synadia/chaos/ChaosRunner.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class ChaosRunner {
3737
public final long delay;
3838
public final long downTime;
3939
public final boolean random;
40+
public final int specificPort;
4041
public final int port;
4142
public final int listen;
4243
public final int monitor;
@@ -101,15 +102,23 @@ private void scheduleUp() {
101102

102103
private void downTask() {
103104
try {
104-
if (random) {
105+
if (specificPort != -1) {
106+
for (int i = 0; i < natsServerRunners.size(); i++) {
107+
NatsServerRunner nsr = natsServerRunners.get(i);
108+
if (nsr.getPort() == specificPort) {
109+
downIx = i;
110+
break;
111+
}
112+
}
113+
}
114+
else if (random) {
105115
downIx = ThreadLocalRandom.current().nextInt(servers);
106116
}
107117

108118
NatsServerRunner runner = natsServerRunners.remove(downIx);
109-
printer.out(CR_LABEL, "DOWN", runner.getPort());
119+
printer.out(CR_LABEL, "DOWN", runner.getPort());
110120
clusterInserts.add(clusterInserts.remove(downIx));
111121
runner.close();
112-
113122
scheduleUp();
114123
}
115124
catch (Throwable e) {
@@ -179,12 +188,16 @@ else if (!a.workDirectory.toFile().exists()) {
179188
this.delay = a.delay;
180189
this.downTime = a.downTime;
181190
this.random = a.random;
191+
this.specificPort = a.specificPort;
182192
this.port = a.port;
183193
this.listen = a.listen;
184194
this.monitor = a.monitor;
185195

186196
natsServerRunners = new ArrayList<>();
187197
if (servers == 1) {
198+
if (specificPort != -1 && specificPort != port) {
199+
throw new IllegalArgumentException("Invalid specific port");
200+
}
188201
List<String> inserts = new ArrayList<>();
189202
ClusterNode cn;
190203
Path jsStorePath = Paths.get(jsStoreDirBase.toString(), "" + port);
@@ -217,6 +230,19 @@ else if (!a.workDirectory.toFile().exists()) {
217230
}
218231
else {
219232
List<ClusterNode> cns = createNodes(servers, clusterName, serverNamePrefix, jsStoreDirBase, DEFAULT_HOST, port, listen, monitor < 1 ? null : monitor);
233+
if (specificPort != -1) {
234+
boolean found = false;
235+
for (ClusterNode cn : cns) {
236+
if (cn.port == specificPort) {
237+
found = true;
238+
break;
239+
}
240+
}
241+
if (!found) {
242+
throw new IllegalArgumentException("Invalid specific port");
243+
}
244+
}
245+
220246
clusterInserts = createClusterInserts(cns);
221247
}
222248

0 commit comments

Comments
 (0)