Skip to content

Commit e5e8905

Browse files
author
VladislavBakshanskij
committed
added concurrent test for spring
1 parent 86378f8 commit e5e8905

File tree

3 files changed

+75
-4
lines changed

3 files changed

+75
-4
lines changed

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultSequenceGenerator.java

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import com.gruelbox.transactionoutbox.spi.Utils;
44
import java.sql.ResultSet;
5+
import java.util.function.Predicate;
6+
57
import lombok.AccessLevel;
68
import lombok.Builder;
79
import lombok.RequiredArgsConstructor;

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/spi/Utils.java

+32-4
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,17 @@
33
import com.gruelbox.transactionoutbox.ThrowingRunnable;
44
import com.gruelbox.transactionoutbox.UncheckedException;
55
import java.sql.SQLIntegrityConstraintViolationException;
6+
import java.util.ArrayList;
67
import java.util.Arrays;
8+
import java.util.HashSet;
9+
import java.util.List;
10+
import java.util.Set;
711
import java.util.concurrent.Callable;
12+
import java.util.function.Predicate;
813
import java.util.function.Supplier;
914
import java.util.stream.Collectors;
15+
import java.util.stream.Stream;
16+
1017
import lombok.extern.slf4j.Slf4j;
1118
import org.slf4j.Logger;
1219
import org.slf4j.event.Level;
@@ -25,12 +32,33 @@ public static boolean safelyRun(String gerund, ThrowingRunnable runnable) {
2532
}
2633
}
2734

28-
public static boolean indexViolation(Exception e) {
29-
return (e instanceof SQLIntegrityConstraintViolationException)
30-
|| (e.getClass().getName().equals("org.postgresql.util.PSQLException")
35+
public static boolean indexViolation(Throwable e) {
36+
boolean indexViolation = (e instanceof SQLIntegrityConstraintViolationException)
37+
|| (e.getClass().getName().equals("org.postgresql.util.PSQLException")
3138
&& e.getMessage().contains("constraint"))
32-
|| (e.getClass().getName().equals("com.microsoft.sqlserver.jdbc.SQLServerException")
39+
|| (e.getClass().getName().equals("com.microsoft.sqlserver.jdbc.SQLServerException")
3340
&& e.getMessage().contains("duplicate key"));
41+
if (indexViolation) {
42+
return true;
43+
}
44+
45+
// we check cause, because the error can be thrown into the proxy
46+
// and wrapped in java.lang.reflect.UndeclaredThrowableException
47+
Set<Throwable> throwables = new HashSet<>();
48+
Throwable cause = e.getCause();
49+
while (cause != null && throwables.add(cause)) {
50+
if ( (cause instanceof SQLIntegrityConstraintViolationException)
51+
|| (cause.getClass().getName().equals("org.postgresql.util.PSQLException")
52+
&& cause.getMessage().contains("constraint"))
53+
|| (cause.getClass().getName().equals("com.microsoft.sqlserver.jdbc.SQLServerException")
54+
&& cause.getMessage().contains("duplicate key"))) {
55+
return true;
56+
}
57+
58+
cause = cause.getCause();
59+
}
60+
61+
return false;
3462
}
3563

3664
@SuppressWarnings("unused")

transactionoutbox-spring/src/test/java/com/gruelbox/transactionoutbox/spring/example/EventuallyConsistentControllerTest.java

+41
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,16 @@
44
import static org.assertj.core.api.Assertions.assertThat;
55
import static org.awaitility.Awaitility.await;
66
import static org.junit.jupiter.api.Assertions.assertTrue;
7+
import static org.junit.jupiter.api.Assertions.fail;
78

89
import java.net.URL;
10+
import java.util.List;
11+
import java.util.concurrent.CompletableFuture;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
import java.util.stream.Collectors;
15+
import java.util.stream.Stream;
16+
917
import org.junit.jupiter.api.BeforeEach;
1018
import org.junit.jupiter.api.Test;
1119
import org.springframework.beans.factory.annotation.Autowired;
@@ -85,4 +93,37 @@ void testCheckOrdered() {
8593
assertThat(externalQueueService.getSent())
8694
.containsExactly(joe, dave, neil, tupac, jeff));
8795
}
96+
97+
@Test
98+
void testCheckOrderedConcurrent() {
99+
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
100+
101+
try {
102+
var joe = new Customer(1L, "Joe", "Strummer");
103+
var dave = new Customer(2L, "Dave", "Grohl");
104+
var neil = new Customer(3L, "Neil", "Diamond");
105+
var tupac = new Customer(4L, "Tupac", "Shakur");
106+
var jeff = new Customer(5L, "Jeff", "Mills");
107+
108+
List<CompletableFuture<Void>> tasks = Stream.of(joe, dave, neil, tupac, jeff)
109+
.map(customer -> CompletableFuture.runAsync(() -> {
110+
var url = base.toString() + "/customer?ordered=true";
111+
assertTrue(template.postForEntity(url, customer, Void.class).getStatusCode().is2xxSuccessful());
112+
}, executorService).exceptionally((throwable) -> fail("Can't create ordered task for customer " + customer, throwable)))
113+
.collect(Collectors.toUnmodifiableList());
114+
115+
CompletableFuture.allOf(tasks.toArray(CompletableFuture[]::new)).join();
116+
117+
await()
118+
.atMost(10, SECONDS)
119+
.pollDelay(1, SECONDS)
120+
.untilAsserted(
121+
() ->
122+
assertThat(externalQueueService.getSent())
123+
.containsExactlyInAnyOrder(joe, dave, neil, tupac, jeff));
124+
125+
} finally {
126+
executorService.shutdownNow();
127+
}
128+
}
88129
}

0 commit comments

Comments
 (0)