Skip to content

Commit 4235ab8

Browse files
authored
refactor: fix code semells (#134)
* refactor: fix code smells * docs: fix rabbitmq docs * refactor: convert message body to string --------- Co-authored-by: luisgomez29 <Luis Guillermo Gómez Galeano>
1 parent ae1f5ec commit 4235ab8

File tree

98 files changed

+923
-1038
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+923
-1038
lines changed

acceptance/async-tests/src/test/java/org/reactivecommons/test/CommandsProcessPerfTest.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
class CommandsProcessPerfTest {
2828

2929
private static final String COMMAND_NAME = "app.command.test";
30-
private static final int messageCount = 40000;
30+
private static final int MESSAGE_COUNT = 40000;
3131
private static final Semaphore semaphore = new Semaphore(0);
3232
private static final CountDownLatch latch = new CountDownLatch(12 + 1);
3333

@@ -41,18 +41,18 @@ class CommandsProcessPerfTest {
4141
@Test
4242
void commandShouldArrive() throws InterruptedException {
4343
final long init_p = System.currentTimeMillis();
44-
createMessages(messageCount);
44+
createMessages(MESSAGE_COUNT);
4545
final long end_p = System.currentTimeMillis() - init_p;
4646
System.out.println("Total Publication Time: " + end_p + "ms");
4747

4848
latch.countDown();
4949
final long init = System.currentTimeMillis();
50-
semaphore.acquire(messageCount);
50+
semaphore.acquire(MESSAGE_COUNT);
5151
final long end = System.currentTimeMillis();
5252

5353
final long total = end - init;
54-
final double microsPerMessage = ((total + 0.0) / messageCount) * 1000;
55-
System.out.println("Message count: " + messageCount);
54+
final double microsPerMessage = ((total + 0.0) / MESSAGE_COUNT) * 1000;
55+
System.out.println("Message count: " + MESSAGE_COUNT);
5656
System.out.println("Total Execution Time: " + total + "ms");
5757
System.out.println("Microseconds per message: " + microsPerMessage + "us");
5858
if (System.getProperty("env.ci") == null) {
@@ -82,7 +82,10 @@ public static void main(String[] args) {
8282

8383
@Bean
8484
public HandlerRegistry registry() {
85-
final HandlerRegistry registry = range(0, 20).reduce(HandlerRegistry.register(), (r, i) -> r.handleCommand("app.command.name" + i, message -> Mono.empty(), Map.class)).block();
85+
final HandlerRegistry registry = range(0, 20)
86+
.reduce(HandlerRegistry.register(), (r, i) ->
87+
r.handleCommand("app.command.name" + i, message -> Mono.empty(), Map.class))
88+
.block();
8689
return registry
8790
.handleCommand(COMMAND_NAME, this::handleSimple, DummyMessage.class);
8891
}

acceptance/async-tests/src/test/java/org/reactivecommons/test/DirectGatewayPerfTest.java

+12-8
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
class DirectGatewayPerfTest {
2525

2626
private static final String COMMAND_NAME = "app.command.test";
27-
private static final int messageCount = 40000;
27+
private static final int MESSAGE_COUNT = 40000;
2828
private static final Semaphore semaphore = new Semaphore(0);
2929

3030
@Autowired
@@ -36,17 +36,17 @@ class DirectGatewayPerfTest {
3636

3737
@Test
3838
void shouldSendInOptimalTime() throws InterruptedException {
39-
final Flux<Command<DummyMessage>> messages = createMessages(messageCount);
39+
final Flux<Command<DummyMessage>> messages = createMessages(MESSAGE_COUNT);
4040
final Flux<Void> target = messages.flatMap(dummyMessageCommand ->
4141
gateway.sendCommand(dummyMessageCommand, appName)
4242
.doOnSuccess(aVoid -> semaphore.release()));
4343

4444
final long init = System.currentTimeMillis();
4545
target.subscribe();
46-
semaphore.acquire(messageCount);
46+
semaphore.acquire(MESSAGE_COUNT);
4747
final long end = System.currentTimeMillis();
4848

49-
assertMessageThroughput(end - init, messageCount, 200);
49+
assertMessageThroughput(end - init, MESSAGE_COUNT, 200);
5050
}
5151

5252
@Test
@@ -67,8 +67,10 @@ void shouldSendBatchInOptimalTime1Channel() throws InterruptedException {
6767
private void shouldSendBatchInOptimalTimeNChannels(int channels) throws InterruptedException {
6868
List<Mono<Void>> subs = new ArrayList<>(channels);
6969
for (int i = 0; i < channels; ++i) {
70-
final Flux<Command<DummyMessage>> messages = createMessages(messageCount / channels);
71-
final Mono<Void> target = gateway.sendCommands(messages, appName).then().doOnSuccess(_v -> semaphore.release());
70+
final Flux<Command<DummyMessage>> messages = createMessages(MESSAGE_COUNT / channels);
71+
final Mono<Void> target = gateway.sendCommands(messages, appName)
72+
.then()
73+
.doOnSuccess(_v -> semaphore.release());
7274
subs.add(target);
7375
}
7476

@@ -79,7 +81,7 @@ private void shouldSendBatchInOptimalTimeNChannels(int channels) throws Interrup
7981
final long end = System.currentTimeMillis();
8082

8183
final long total = end - init;
82-
assertMessageThroughput(total, messageCount, 230);
84+
assertMessageThroughput(total, MESSAGE_COUNT, 230);
8385
}
8486

8587
private void assertMessageThroughput(long total, long messageCount, int reqMicrosPerMessage) {
@@ -94,7 +96,9 @@ private void assertMessageThroughput(long total, long messageCount, int reqMicro
9496
}
9597

9698
private Flux<Command<DummyMessage>> createMessages(int count) {
97-
final List<Command<DummyMessage>> commands = IntStream.range(0, count).mapToObj(value -> new Command<>(COMMAND_NAME, UUID.randomUUID().toString(), new DummyMessage())).collect(Collectors.toList());
99+
final List<Command<DummyMessage>> commands = IntStream.range(0, count)
100+
.mapToObj(value -> new Command<>(COMMAND_NAME, UUID.randomUUID().toString(), new DummyMessage()))
101+
.collect(Collectors.toList());
98102
return Flux.fromIterable(commands);
99103
}
100104

acceptance/async-tests/src/test/java/org/reactivecommons/test/DynamicRegistryTest.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import org.springframework.boot.SpringApplication;
1414
import org.springframework.boot.autoconfigure.SpringBootApplication;
1515
import org.springframework.boot.test.context.SpringBootTest;
16-
import reactor.core.publisher.UnicastProcessor;
16+
import reactor.core.publisher.Sinks;
1717
import reactor.test.StepVerifier;
1818

1919
import java.time.Duration;
@@ -35,28 +35,26 @@ class DynamicRegistryTest {
3535

3636
@Test
3737
void shouldReceiveResponse() {
38-
UnicastProcessor<String> result = UnicastProcessor.create();
39-
DomainEventHandler<String> fn = message -> fromRunnable(() -> result.onNext(message.getData()));
38+
Sinks.Many<String> result = Sinks.many().unicast().onBackpressureBuffer();
39+
DomainEventHandler<String> fn = message -> fromRunnable(
40+
() -> result.emitNext(message.getData(), Sinks.EmitFailureHandler.FAIL_FAST)
41+
);
4042

4143
dynamicRegistry.listenEvent("test.event", fn, String.class).block();
4244
final Publisher<Void> emit = eventBus.emit(new DomainEvent<>("test.event", "42", "Hello"));
4345
from(emit).block();
4446

45-
StepVerifier.create(result.next().timeout(Duration.ofSeconds(10)))
47+
StepVerifier.create(result.asFlux().next().timeout(Duration.ofSeconds(10)))
4648
.expectNext("Hello")
4749
.verifyComplete();
48-
49-
5050
}
5151

52-
5352
@SpringBootApplication
5453
@EnableMessageListeners
5554
@EnableDomainEventBus
5655
static class App {
5756
public static void main(String[] args) {
5857
SpringApplication.run(App.class, args);
5958
}
60-
6159
}
6260
}

acceptance/async-tests/src/test/java/org/reactivecommons/test/QueryProcessPerfTest.java

+15-8
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
class QueryProcessPerfTest {
3131

3232
private static final String QUERY_NAME = "app.command.test";
33-
private static final int messageCount = 40000;
33+
private static final int MESSAGE_COUNT = 40000;
3434
private static final Semaphore semaphore = new Semaphore(0);
3535
private static final AtomicLong atomicLong = new AtomicLong(0);
3636
private static final CountDownLatch latch = new CountDownLatch(12 + 1);
@@ -44,19 +44,20 @@ class QueryProcessPerfTest {
4444

4545
@Test
4646
void serveQueryPerformanceTest() throws InterruptedException {
47-
final Flux<AsyncQuery<DummyMessage>> messages = createMessages(messageCount);
47+
final Flux<AsyncQuery<DummyMessage>> messages = createMessages(MESSAGE_COUNT);
4848

4949
final long init = System.currentTimeMillis();
5050
messages
51-
.flatMap(dummyMessageAsyncQuery -> gateway.requestReply(dummyMessageAsyncQuery, appName, DummyMessage.class)
52-
.doOnNext(s -> semaphore.release())
51+
.flatMap(dummyMessageAsyncQuery ->
52+
gateway.requestReply(dummyMessageAsyncQuery, appName, DummyMessage.class)
53+
.doOnNext(s -> semaphore.release())
5354
)
5455
.subscribe();
55-
semaphore.acquire(messageCount);
56+
semaphore.acquire(MESSAGE_COUNT);
5657
final long end = System.currentTimeMillis();
5758

5859
final long total = end - init;
59-
assertMessageThroughput(total, messageCount, 200);
60+
assertMessageThroughput(total, MESSAGE_COUNT, 200);
6061
}
6162

6263
private void assertMessageThroughput(long total, long messageCount, int reqMicrosPerMessage) {
@@ -72,7 +73,9 @@ private void assertMessageThroughput(long total, long messageCount, int reqMicro
7273

7374

7475
private Flux<AsyncQuery<DummyMessage>> createMessages(int count) {
75-
final List<AsyncQuery<DummyMessage>> queryList = IntStream.range(0, count).mapToObj(_v -> new AsyncQuery<>(QUERY_NAME, new DummyMessage())).collect(Collectors.toList());
76+
final List<AsyncQuery<DummyMessage>> queryList = IntStream.range(0, count)
77+
.mapToObj(_v -> new AsyncQuery<>(QUERY_NAME, new DummyMessage()))
78+
.collect(Collectors.toList());
7679
return Flux.fromIterable(queryList);
7780
}
7881

@@ -87,7 +90,11 @@ public static void main(String[] args) {
8790

8891
@Bean
8992
public HandlerRegistry registry() {
90-
final HandlerRegistry registry = range(0, 20).reduce(HandlerRegistry.register(), (r, i) -> r.handleCommand("app.command.name" + i, message -> Mono.empty(), Map.class)).block();
93+
final HandlerRegistry registry = range(0, 20)
94+
.reduce(HandlerRegistry.register(), (r, i) -> r.handleCommand(
95+
"app.command.name" + i, message -> Mono.empty(), Map.class
96+
))
97+
.block();
9198
return registry
9299
.serveQuery(QUERY_NAME, this::handleSimple, DummyMessage.class);
93100
}

acceptance/async-tests/src/test/java/org/reactivecommons/test/SimpleDirectCommunicationTest.java

+16-19
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import org.springframework.boot.test.context.SpringBootTest;
1616
import org.springframework.context.annotation.Bean;
1717
import reactor.core.publisher.Mono;
18-
import reactor.core.publisher.UnicastProcessor;
18+
import reactor.core.publisher.Sinks;
1919
import reactor.test.StepVerifier;
2020

2121
import java.time.Duration;
@@ -36,55 +36,52 @@ class SimpleDirectCommunicationTest {
3636
@Value("${spring.application.name}")
3737
private String appName;
3838

39-
@Autowired
40-
private UnicastProcessor<Command<Long>> listener;
41-
42-
private String commandId = ThreadLocalRandom.current().nextInt() + "";
43-
private Long data = ThreadLocalRandom.current().nextLong();
39+
private final String commandId = ThreadLocalRandom.current().nextInt() + "";
40+
private final Long data = ThreadLocalRandom.current().nextLong();
4441

4542
@Test
4643
void commandShouldArrive() {
4744
Command<Long> command = new Command<>(COMMAND_NAME, commandId, data);
4845
gateway.sendCommand(command, appName).subscribe();
46+
Sinks.Many<Command<Long>> listener = Sinks.many().unicast().onBackpressureBuffer();
4947

50-
StepVerifier.create(listener.next()).assertNext(cmd -> {
48+
StepVerifier.create(listener.asFlux().next()).assertNext(cmd -> {
5149
assertThat(cmd).extracting(Command::getCommandId, Command::getData, Command::getName)
52-
.containsExactly(commandId, data, COMMAND_NAME);
50+
.containsExactly(commandId, data, COMMAND_NAME);
5351
}).verifyComplete();
5452
}
5553

5654
@Test
5755
void shouldReceiveResponse() {
5856
final Mono<Integer> reply = gateway.requestReply(new AsyncQuery<>("double", 42), appName, Integer.class);
5957
StepVerifier.create(reply.timeout(Duration.ofSeconds(15)))
60-
.expectNext(42*2)
61-
.verifyComplete();
58+
.expectNext(42 * 2)
59+
.verifyComplete();
6260
}
6361

64-
6562
@SpringBootApplication
6663
@EnableDirectAsyncGateway
6764
@EnableMessageListeners
68-
static class App{
65+
static class App {
6966
public static void main(String[] args) {
7067
SpringApplication.run(App.class, args);
7168
}
7269

7370
@Bean
74-
public HandlerRegistry registry(UnicastProcessor<Command<Long>> listener) {
71+
public HandlerRegistry registry(Sinks.Many<Command<Long>> listener) {
7572
return HandlerRegistry.register()
76-
.serveQuery("double", rqt -> just(rqt*2), Long.class)
77-
.handleCommand(COMMAND_NAME, handle(listener), Long.class);
73+
.serveQuery("double", rqt -> just(rqt * 2), Long.class)
74+
.handleCommand(COMMAND_NAME, handle(listener), Long.class);
7875
}
7976

8077
@Bean
81-
public UnicastProcessor<Command<Long>> listener() {
82-
return UnicastProcessor.create();
78+
public Sinks.Many<Command<Long>> listener() {
79+
return Sinks.many().unicast().onBackpressureBuffer();
8380
}
8481

85-
private DomainCommandHandler<Long> handle(UnicastProcessor<Command<Long>> listener) {
82+
private DomainCommandHandler<Long> handle(Sinks.Many<Command<Long>> listener) {
8683
return command -> {
87-
listener.onNext(command);
84+
listener.emitNext(command, Sinks.EmitFailureHandler.FAIL_FAST);
8885
return empty();
8986
};
9087
}

acceptance/async-tests/src/test/java/org/reactivecommons/test/SimpleEventNotificationTest.java

+10-12
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import org.springframework.boot.autoconfigure.SpringBootApplication;
1313
import org.springframework.boot.test.context.SpringBootTest;
1414
import org.springframework.context.annotation.Bean;
15-
import reactor.core.publisher.UnicastProcessor;
15+
import reactor.core.publisher.Sinks;
1616
import reactor.test.StepVerifier;
1717

1818
import java.util.concurrent.ThreadLocalRandom;
@@ -30,17 +30,15 @@ class SimpleEventNotificationTest {
3030
@Autowired
3131
private DomainEventBus eventBus;
3232

33-
@Autowired
34-
private UnicastProcessor<DomainEvent<Long>> listener;
35-
36-
private String eventId = ThreadLocalRandom.current().nextInt() + "";
37-
private Long data = ThreadLocalRandom.current().nextLong();
33+
private final String eventId = ThreadLocalRandom.current().nextInt() + "";
34+
private final Long data = ThreadLocalRandom.current().nextLong();
3835

3936
@Test
4037
void shouldReceiveEvent() throws InterruptedException {
4138
DomainEvent<?> event = new DomainEvent<>(EVENT_NAME, eventId, data);
39+
Sinks.Many<DomainEvent<Long>> listener = Sinks.many().unicast().onBackpressureBuffer();
4240
from(eventBus.emit(event)).subscribe();
43-
StepVerifier.create(listener.take(1)).assertNext(evt ->
41+
StepVerifier.create(listener.asFlux().take(1)).assertNext(evt ->
4442
assertThat(evt).extracting(DomainEvent::getName, DomainEvent::getEventId, DomainEvent::getData)
4543
.containsExactly(EVENT_NAME, eventId, data)
4644
).verifyComplete();
@@ -56,20 +54,20 @@ public static void main(String[] args) {
5654
}
5755

5856
@Bean
59-
public HandlerRegistry registry(UnicastProcessor<DomainEvent<Long>> listener) {
57+
public HandlerRegistry registry(Sinks.Many<DomainEvent<Long>> listener) {
6058
return HandlerRegistry.register()
6159
.serveQuery("double", rqt -> just(rqt * 2), Long.class)
6260
.listenEvent(EVENT_NAME, handle(listener), Long.class);
6361
}
6462

6563
@Bean
66-
public UnicastProcessor<DomainEvent<Long>> listener() {
67-
return UnicastProcessor.create();
64+
public Sinks.Many<DomainEvent<Long>> listener() {
65+
return Sinks.many().unicast().onBackpressureBuffer();
6866
}
6967

70-
private DomainEventHandler<Long> handle(UnicastProcessor<DomainEvent<Long>> listener) {
68+
private DomainEventHandler<Long> handle(Sinks.Many<DomainEvent<Long>> listener) {
7169
return command -> {
72-
listener.onNext(command);
70+
listener.emitNext(command, Sinks.EmitFailureHandler.FAIL_FAST);
7371
return empty();
7472
};
7573
}

0 commit comments

Comments
 (0)