Skip to content

Commit 9c3e864

Browse files
author
Graham Crockford
committed
Support for checking entry state
1 parent 4d39d26 commit 9c3e864

File tree

11 files changed

+433
-76
lines changed

11 files changed

+433
-76
lines changed

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultInvocationSerializer.java

+55-46
Original file line numberDiff line numberDiff line change
@@ -217,37 +217,39 @@ private Class<?> toClass(ClassLoader classLoader, String name) {
217217
public JsonElement serialize(Invocation src, Type typeOfSrc, JsonSerializationContext context) {
218218
if (version == 1) {
219219
log.warn("Serializing as deprecated version {}", version);
220-
return serializeV1(src, typeOfSrc, context);
220+
return serializeV1(src, context);
221221
}
222222
JsonObject obj = new JsonObject();
223223
obj.addProperty("c", src.getClassName());
224224
obj.addProperty("m", src.getMethodName());
225225
JsonArray params = new JsonArray();
226-
JsonArray args = new JsonArray();
227-
int i = 0;
228-
for (Class<?> parameterType : src.getParameterTypes()) {
229-
params.add(nameForClass(parameterType));
230-
Object arg = src.getArgs()[i];
231-
if (arg == null) {
232-
JsonObject jsonObject = new JsonObject();
233-
jsonObject.add("t", null);
234-
jsonObject.add("v", null);
235-
args.add(jsonObject);
236-
} else {
237-
JsonObject jsonObject = new JsonObject();
238-
jsonObject.addProperty("t", nameForClass(arg.getClass()));
239-
jsonObject.add("v", context.serialize(arg));
240-
args.add(jsonObject);
226+
if (src.getParameterTypes().length > 0) {
227+
JsonArray args = new JsonArray();
228+
int i = 0;
229+
for (Class<?> parameterType : src.getParameterTypes()) {
230+
params.add(nameForClass(parameterType));
231+
Object arg = src.getArgs()[i];
232+
if (arg == null) {
233+
JsonObject jsonObject = new JsonObject();
234+
jsonObject.add("t", null);
235+
jsonObject.add("v", null);
236+
args.add(jsonObject);
237+
} else {
238+
JsonObject jsonObject = new JsonObject();
239+
jsonObject.addProperty("t", nameForClass(arg.getClass()));
240+
jsonObject.add("v", context.serialize(arg));
241+
args.add(jsonObject);
242+
}
243+
i++;
241244
}
242-
i++;
245+
obj.add("a", args);
243246
}
244247
obj.add("p", params);
245-
obj.add("a", args);
246248
obj.add("x", context.serialize(src.getMdc()));
247249
return obj;
248250
}
249251

250-
JsonElement serializeV1(Invocation src, Type typeOfSrc, JsonSerializationContext context) {
252+
JsonElement serializeV1(Invocation src, JsonSerializationContext context) {
251253
JsonObject obj = new JsonObject();
252254
obj.addProperty("c", src.getClassName());
253255
obj.addProperty("m", src.getMethodName());
@@ -274,39 +276,46 @@ public Invocation deserialize(
274276
String className = jsonObject.get("c").getAsString();
275277
String methodName = jsonObject.get("m").getAsString();
276278

277-
JsonArray jsonParams = jsonObject.get("p").getAsJsonArray();
278-
Class<?>[] params = new Class<?>[jsonParams.size()];
279-
for (int i = 0; i < jsonParams.size(); i++) {
280-
JsonElement param = jsonParams.get(i);
281-
if (param.isJsonObject()) {
282-
// For backwards compatibility
283-
params[i] = classForName(param.getAsJsonObject().get("t").getAsString());
284-
} else {
285-
params[i] = classForName(param.getAsString());
279+
Class<?>[] params = null;
280+
if (jsonObject.has("p")) {
281+
JsonArray jsonParams = jsonObject.get("p").getAsJsonArray();
282+
params = new Class<?>[jsonParams.size()];
283+
for (int i = 0; i < jsonParams.size(); i++) {
284+
JsonElement param = jsonParams.get(i);
285+
if (param.isJsonObject()) {
286+
// For backwards compatibility
287+
params[i] = classForName(param.getAsJsonObject().get("t").getAsString());
288+
} else {
289+
params[i] = classForName(param.getAsString());
290+
}
286291
}
287292
}
288293

289-
JsonElement argsElement = jsonObject.get("a");
290-
if (argsElement == null) {
291-
// For backwards compatibility
292-
argsElement = jsonObject.get("p");
293-
}
294-
JsonArray jsonArgs = argsElement.getAsJsonArray();
295-
Object[] args = new Object[jsonArgs.size()];
296-
for (int i = 0; i < jsonArgs.size(); i++) {
297-
JsonElement arg = jsonArgs.get(i);
298-
JsonElement argType = arg.getAsJsonObject().get("t");
299-
if (argType != null) {
300-
JsonElement argValue = arg.getAsJsonObject().get("v");
301-
Class<?> argClass = classForName(argType.getAsString());
302-
try {
303-
args[i] = context.deserialize(argValue, argClass);
304-
} catch (Exception e) {
305-
throw new RuntimeException(
306-
"Failed to deserialize arg [" + argValue + "] of type [" + argType + "]", e);
294+
Object[] args = null;
295+
if (jsonObject.has("a")) {
296+
JsonElement argsElement = jsonObject.get("a");
297+
if (argsElement == null) {
298+
// For backwards compatibility
299+
argsElement = jsonObject.get("p");
300+
}
301+
JsonArray jsonArgs = argsElement.getAsJsonArray();
302+
args = new Object[jsonArgs.size()];
303+
for (int i = 0; i < jsonArgs.size(); i++) {
304+
JsonElement arg = jsonArgs.get(i);
305+
JsonElement argType = arg.getAsJsonObject().get("t");
306+
if (argType != null) {
307+
JsonElement argValue = arg.getAsJsonObject().get("v");
308+
Class<?> argClass = classForName(argType.getAsString());
309+
try {
310+
args[i] = context.deserialize(argValue, argClass);
311+
} catch (Exception e) {
312+
throw new RuntimeException(
313+
"Failed to deserialize arg [" + argValue + "] of type [" + argType + "]", e);
314+
}
307315
}
308316
}
309317
}
318+
310319
Map<String, String> mdc = context.deserialize(jsonObject.get("x"), Map.class);
311320

312321
return new Invocation(className, methodName, params, args, mdc);

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java

+13
Original file line numberDiff line numberDiff line change
@@ -427,4 +427,17 @@ public boolean checkConnection(Transaction tx) throws SQLException {
427427
return rs.next() && (rs.getInt(1) == 1);
428428
}
429429
}
430+
431+
@Override
432+
public Optional<TransactionOutboxEntry> load(Transaction tx, String entryId) throws Exception {
433+
//noinspection resource
434+
try (PreparedStatement stmt =
435+
tx.connection()
436+
.prepareStatement("SELECT " + ALL_FIELDS + " FROM " + tableName + " WHERE id = ?")) {
437+
stmt.setString(1, entryId);
438+
List<TransactionOutboxEntry> results = new ArrayList<>(1);
439+
gatherResults(stmt, results);
440+
return results.stream().findFirst();
441+
}
442+
}
430443
}

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Persistor.java

+9
Original file line numberDiff line numberDiff line change
@@ -150,4 +150,13 @@ List<TransactionOutboxEntry> selectBatch(Transaction tx, int batchSize, Instant
150150
* @param tx The current {@link Transaction}.
151151
*/
152152
void clear(Transaction tx) throws Exception;
153+
154+
/**
155+
* Loads the specified entry.
156+
*
157+
* @param tx The current {@link Transaction}.
158+
* @param entryId The entry id.
159+
* @return The entry, or empty if it does not exist.
160+
*/
161+
Optional<TransactionOutboxEntry> load(Transaction tx, String entryId) throws Exception;
153162
}

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/StubPersistor.java

+5
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now)
6464
@Override
6565
public void clear(Transaction tx) {}
6666

67+
@Override
68+
public Optional<TransactionOutboxEntry> load(Transaction tx, String entryId) {
69+
return Optional.empty();
70+
}
71+
6772
@Override
6873
public boolean checkConnection(Transaction tx) {
6974
return true;

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java

+63
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import java.time.Clock;
44
import java.time.Duration;
5+
import java.util.Optional;
56
import java.util.concurrent.Executor;
7+
import java.util.function.Consumer;
68
import java.util.function.Supplier;
79
import lombok.ToString;
810
import org.slf4j.MDC;
@@ -135,6 +137,51 @@ default boolean flush() {
135137
@SuppressWarnings("WeakerAccess")
136138
void processNow(TransactionOutboxEntry entry);
137139

140+
/**
141+
* Loads the specified queued test. This can be used to check the status.
142+
*
143+
* <p>Requires the transaction manager in use to be a {@link
144+
* ThreadLocalContextTransactionManager}.
145+
*
146+
* <p>Note that nothing may be returned for four distinct reasons:
147+
*
148+
* <ul>
149+
* <li>Such a task has never been created and is therefore unknown
150+
* <li>The task was created, and successfully completed, but had no {@code uniqueRequestId} so
151+
* was deleted.
152+
* <li>The task was created, and completed, and had a {@code uniqueRequestId}, but the {@code
153+
* retentionThreshold} has passed so the record has been deleted
154+
* <li>The task was created, and may have succeeded or failed but was deleted from the database
155+
* by a user for some reason.
156+
* </ul>
157+
*
158+
* @param entryId The entry id.
159+
* @return Empty if no such entry exists, otherwise the entry.
160+
*/
161+
Optional<TransactionOutboxEntry> fetchEntry(String entryId);
162+
163+
/**
164+
* Loads the specified queued request. This can be used to check the status of a request.
165+
*
166+
* <p>Note that nothing may be returned for four distinct reasons:
167+
*
168+
* <ul>
169+
* <li>Such a task has never been created and is therefore unknown
170+
* <li>The task was created, and successfully completed, but had no {@code uniqueRequestId} so
171+
* was deleted.
172+
* <li>The task was created, and completed, and had a {@code uniqueRequestId}, but the {@code
173+
* retentionThreshold} has passed so the record has been deleted
174+
* <li>The task was created, and may have succeeded or failed but was deleted from the database
175+
* by a user for some reason.
176+
* </ul>
177+
*
178+
* @param entryId The entry id.
179+
* @param transactionContext The transaction context (if using a {@link
180+
* ParameterContextTransactionManager}).
181+
* @return Empty if no such entry exists, otherwise the entry.
182+
*/
183+
Optional<TransactionOutboxEntry> fetchEntry(String entryId, Object transactionContext);
184+
138185
/** Builder for {@link TransactionOutbox}. */
139186
@ToString
140187
abstract class TransactionOutboxBuilder {
@@ -368,6 +415,22 @@ interface ParameterizedScheduleBuilder {
368415
*/
369416
ParameterizedScheduleBuilder ordered(String topic);
370417

418+
/**
419+
* If set, {@code callback} will be called in the current thread immediately after writing the
420+
* entry to the database, so that the details of the record can be inspected or the {@code id}
421+
* retained to be able to check the status with {@link TransactionOutbox#fetchEntry(String)}.
422+
*
423+
* <p>Note that a successful callback does not necessarily mean that the entry has, or will
424+
* ever, be processed. The current transaction could be rolled back rather than committed. If
425+
* this occurs, {@link TransactionOutbox#fetchEntry(String)} will simply return empty.
426+
*
427+
* @param callback Receives the {@link TransactionOutboxEntry} once it has been inserted into
428+
* the database (though this insert may be rolled back and will not be processed if this
429+
* happens).
430+
* @return Builder.
431+
*/
432+
ParameterizedScheduleBuilder entryCallback(Consumer<TransactionOutboxEntry> callback);
433+
371434
/**
372435
* Equivalent to {@link TransactionOutbox#schedule(Class)}, but applying additional parameters
373436
* to the request as configured using {@link TransactionOutbox#with()}.

0 commit comments

Comments
 (0)