Skip to content

Commit c3ecf13

Browse files
authored
Merge pull request #7 from synadia-io/publish-1
Start Publisher 0.1.0
2 parents 907644a + ac1e9cb commit c3ecf13

File tree

13 files changed

+468
-258
lines changed

13 files changed

+468
-258
lines changed

js-publish-extensions/README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
![Synadia](src/main/javadoc/images/synadia-logo.png)      ![NATS](src/main/javadoc/images/large-logo.png)
22

3-
# JNATS Managed Async JetStream Publisher Extension
3+
# JNATS JetStream Publisher Extensions
44

5-
Utility to automatically publish async with load management and error handling.
5+
The library contains 2 utilities.
6+
7+
### PublishRetrier
8+
9+
This class parallels the standard JetStream publish api with methods that will retry the publish
610

711
**Current Release**: 0.0.1   **Current Snapshot**: 0.0.2-SNAPSHOT
812

js-publish-extensions/build.gradle

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ plugins {
1313
id 'signing'
1414
}
1515

16-
def jarVersion = "0.0.11"
16+
def jarVersion = "0.1.0"
1717
group = 'io.synadia'
1818

1919
def isMerge = System.getenv("BUILD_EVENT") == "push"
@@ -32,11 +32,17 @@ repositories {
3232
maven {
3333
url "https://oss.sonatype.org/content/repositories/releases/"
3434
}
35+
maven {
36+
url "https://oss.sonatype.org/content/repositories/snapshots"
37+
}
38+
maven {
39+
url "https://s01.oss.sonatype.org/content/repositories/snapshots/"
40+
}
3541
}
3642

3743
dependencies {
38-
implementation 'io.nats:jnats:2.18.1'
39-
implementation 'io.synadia:jnats-retrier:0.0.1'
44+
implementation 'io.nats:jnats:2.18.2-SNAPSHOT'
45+
implementation 'io.synadia:retrier:0.2.0-SNAPSHOT'
4046

4147
testImplementation 'io.nats:jnats-server-runner:1.2.8'
4248
testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0'
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
// Copyright (c) 2024 Synadia Communications Inc. All Rights Reserved.
2+
// See LICENSE and NOTICE file for details.
3+
4+
package io.synadia.examples;
5+
6+
import io.nats.client.Connection;
7+
import io.nats.client.JetStreamApiException;
8+
import io.nats.client.Nats;
9+
import io.nats.client.Options;
10+
import io.nats.client.api.StorageType;
11+
import io.nats.client.api.StreamConfiguration;
12+
import io.nats.client.impl.ErrorListenerConsoleImpl;
13+
import io.synadia.jnats.extension.AsyncJsPublishListener;
14+
import io.synadia.jnats.extension.AsyncJsPublisher;
15+
import io.synadia.jnats.extension.Flight;
16+
17+
import java.io.IOException;
18+
import java.util.concurrent.ExecutorService;
19+
import java.util.concurrent.Executors;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicLong;
22+
23+
import static io.synadia.retrier.RetryConfig.DEFAULT_CONFIG;
24+
25+
public class AsyncJsPublisherExample {
26+
27+
public static final int COUNT = 100_000;
28+
public static final String STREAM = "managed";
29+
public static final String SUBJECT = "managed_subject";
30+
31+
public static final boolean USE_RETRIER = false; // set this to true to have each publish use retry logic
32+
public static final boolean BUILT_IN_START = true; // set this to false in order to demonstrate the custom start
33+
34+
public static void main(String[] args) {
35+
Options options = Options.builder()
36+
.connectionListener((connection, events) -> print("Connection Event:" + events.getEvent()))
37+
.errorListener(new ErrorListenerConsoleImpl())
38+
.build();
39+
40+
try (Connection nc = Nats.connect(options)) {
41+
setupStream(nc);
42+
43+
AsyncJsPublishListener publishListener = new ExamplePublishListener();
44+
45+
AsyncJsPublisher.Builder builder =
46+
AsyncJsPublisher.builder(nc.jetStream())
47+
.publishListener(publishListener);
48+
49+
if (USE_RETRIER) {
50+
builder.retryConfig(DEFAULT_CONFIG);
51+
}
52+
53+
if (BUILT_IN_START) {
54+
// the publisher is AutoCloseable
55+
try (AsyncJsPublisher publisher = builder.start()) {
56+
publish(publisher, publishListener);
57+
}
58+
}
59+
else {
60+
// custom notification executor
61+
ExecutorService notificationExecutorService = Executors.newFixedThreadPool(1);
62+
builder.notificationExecutorService(notificationExecutorService);
63+
64+
AsyncJsPublisher publisher = builder.build();
65+
66+
// this custom start mimics what the built-in does but
67+
// shows how to access the publish / flights runner Runnable(s)
68+
Thread publishRunnerThread = new Thread(publisher::publishRunner);
69+
publishRunnerThread.start();
70+
Thread flightsRunnerThread = new Thread(publisher::flightsRunner);
71+
flightsRunnerThread.start();
72+
73+
// same publish logic as the built-in start
74+
publish(publisher, publishListener);
75+
76+
// if you have a custom start, you probably want some custom closing
77+
// again, the example mimics what the built-in does
78+
// don't forget to call the publisher close, because it does some stuff
79+
publisher.close();
80+
notificationExecutorService.shutdown();
81+
if (!publisher.getPublishRunnerDoneLatch().await(publisher.getPollTime(), TimeUnit.MILLISECONDS)) {
82+
publishRunnerThread.interrupt();
83+
}
84+
if (!publisher.getFlightsRunnerDoneLatch().await(publisher.getPollTime(), TimeUnit.MILLISECONDS)) {
85+
flightsRunnerThread.interrupt();
86+
}
87+
}
88+
}
89+
catch (Exception e) {
90+
//noinspection CallToPrintStackTrace
91+
e.printStackTrace();
92+
}
93+
}
94+
95+
private static void publish(AsyncJsPublisher publisher, AsyncJsPublishListener publishListener) throws InterruptedException {
96+
for (int x = 0; x < COUNT; x++) {
97+
publisher.publishAsync(SUBJECT, ("data-" + x).getBytes());
98+
}
99+
100+
while (publisher.preFlightSize() > 0) {
101+
System.out.println(publishListener);
102+
//noinspection BusyWait
103+
Thread.sleep(1000);
104+
}
105+
106+
while (publisher.inFlightSize() > 0) {
107+
System.out.println(publishListener);
108+
//noinspection BusyWait
109+
Thread.sleep(1000);
110+
}
111+
112+
System.out.println(publishListener);
113+
}
114+
115+
private static void setupStream(Connection nc) {
116+
try {
117+
nc.jetStreamManagement().deleteStream(STREAM);
118+
}
119+
catch (Exception ignore) {}
120+
try {
121+
System.out.println("Creating Stream @ " + System.currentTimeMillis());
122+
nc.jetStreamManagement().addStream(StreamConfiguration.builder()
123+
.name(STREAM)
124+
.subjects(SUBJECT)
125+
.storageType(StorageType.File)
126+
.build());
127+
}
128+
catch (IOException | JetStreamApiException e) {
129+
throw new RuntimeException(e);
130+
}
131+
}
132+
133+
static class ExamplePublishListener implements AsyncJsPublishListener {
134+
public AtomicLong published = new AtomicLong();
135+
public AtomicLong acked = new AtomicLong();
136+
public AtomicLong exceptioned = new AtomicLong();
137+
public AtomicLong timedOut = new AtomicLong();
138+
139+
@Override
140+
public String toString() {
141+
return "published=" + published +
142+
", acked=" + acked +
143+
", exceptioned=" + exceptioned +
144+
", timed out=" + timedOut;
145+
}
146+
147+
@Override
148+
public void published(Flight flight) {
149+
published.incrementAndGet();
150+
}
151+
152+
@Override
153+
public void acked(Flight flight) {
154+
acked.incrementAndGet();
155+
}
156+
157+
@Override
158+
public void completedExceptionally(Flight flight) {
159+
try {
160+
exceptioned.incrementAndGet();
161+
flight.publishAckFuture.get();
162+
}
163+
catch (Exception e) {
164+
print("completedExceptionally", new String(flight.getBody()), e.toString());
165+
}
166+
}
167+
168+
@Override
169+
public void timeout(Flight flight) {
170+
try {
171+
timedOut.incrementAndGet();
172+
flight.publishAckFuture.get();
173+
}
174+
catch (Exception e) {
175+
print("timeout", new String(flight.getBody()), e.toString());
176+
}
177+
}
178+
}
179+
180+
private static void print(String... strings) {
181+
System.out.println(String.join(" | ", strings));
182+
}
183+
}

js-publish-extensions/src/examples/java/io/synadia/examples/ManagedExample.java

Lines changed: 0 additions & 135 deletions
This file was deleted.
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,17 @@
99
import io.nats.client.api.PublishAck;
1010
import io.nats.client.api.StorageType;
1111
import io.nats.client.api.StreamConfiguration;
12-
import io.synadia.jnats.extension.Retrier;
13-
import io.synadia.jnats.extension.RetryConfig;
12+
import io.synadia.jnats.extension.PublishRetrier;
13+
import io.synadia.retrier.RetryConfig;
1414

1515
import java.io.IOException;
1616
import java.util.concurrent.CompletableFuture;
1717
import java.util.concurrent.TimeUnit;
1818

19-
public class RetrierPublishAsyncExample {
19+
public class PublishRetrierAsyncExample {
2020

21-
public static String STREAM = "retrierA";
22-
public static String SUBJECT = "retrierA-subject";
21+
public static String STREAM = "pr-async-stream";
22+
public static String SUBJECT = "pr-async-subject";
2323

2424
public static void main(String[] args) {
2525
try (Connection nc = Nats.connect()) {
@@ -53,7 +53,7 @@ public static void main(String[] args) {
5353
long now = System.currentTimeMillis();
5454

5555
System.out.println("Publishing @ " + now);
56-
CompletableFuture<PublishAck> cfpa = Retrier.publishAsync(config, nc.jetStream(), SUBJECT, null);
56+
CompletableFuture<PublishAck> cfpa = PublishRetrier.publishAsync(config, nc.jetStream(), SUBJECT, null);
5757
PublishAck pa = cfpa.get(30, TimeUnit.SECONDS);
5858
long done = System.currentTimeMillis();
5959

0 commit comments

Comments
 (0)