-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provide an interface for generating sequences #678
base: master
Are you sure you want to change the base?
Changes from all commits
7cbe0f5
432f571
647514c
690545b
86378f8
cacd8da
2049a26
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,24 +1,23 @@ | ||
package com.gruelbox.transactionoutbox; | ||
|
||
import com.gruelbox.transactionoutbox.spi.Utils; | ||
import java.io.IOException; | ||
import java.io.Reader; | ||
import java.io.StringWriter; | ||
import java.io.Writer; | ||
import java.sql.PreparedStatement; | ||
import java.sql.ResultSet; | ||
import java.sql.SQLException; | ||
import java.sql.SQLIntegrityConstraintViolationException; | ||
import java.sql.SQLTimeoutException; | ||
import java.sql.Statement; | ||
import java.sql.Timestamp; | ||
import java.time.Instant; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import lombok.AccessLevel; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Builder; | ||
import lombok.experimental.SuperBuilder; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
/** | ||
|
@@ -33,10 +32,12 @@ | |
* equally esoteric, you may prefer to implement {@link Persistor} from the ground up. | ||
*/ | ||
@Slf4j | ||
@SuperBuilder | ||
@AllArgsConstructor(access = AccessLevel.PROTECTED) | ||
public class DefaultPersistor implements Persistor, Validatable { | ||
|
||
private static final int DEFAULT_WRITE_LOCK_TIMEOUT_SECONDS = 2; | ||
private static final String DEFAULT_TABLE_NAME = "TXNO_OUTBOX"; | ||
|
||
private static final String ALL_FIELDS = | ||
"id, uniqueRequestId, invocation, topic, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version"; | ||
|
||
|
@@ -47,21 +48,25 @@ public class DefaultPersistor implements Persistor, Validatable { | |
* does not support skip locking. | ||
*/ | ||
@SuppressWarnings("JavaDoc") | ||
@Builder.Default | ||
private final int writeLockTimeoutSeconds = 2; | ||
private final int writeLockTimeoutSeconds; | ||
|
||
/** | ||
* @param dialect The database dialect to use. Required. | ||
*/ | ||
@SuppressWarnings("JavaDoc") | ||
private final Dialect dialect; | ||
|
||
/** | ||
* @param sequenceGenerator The sequence generator used for ordered tasks. Required | ||
*/ | ||
@SuppressWarnings("JavaDoc") | ||
private final SequenceGenerator sequenceGenerator; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On its own, this is a compatibility break; it requires all existing users to specify There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
|
||
/** | ||
* @param tableName The database table name. The default is {@code TXNO_OUTBOX}. | ||
*/ | ||
@SuppressWarnings("JavaDoc") | ||
@Builder.Default | ||
private final String tableName = "TXNO_OUTBOX"; | ||
private final String tableName; | ||
|
||
/** | ||
* @param migrate Set to false to disable automatic database migrations. This may be preferred if | ||
|
@@ -71,23 +76,36 @@ public class DefaultPersistor implements Persistor, Validatable { | |
* the migrations. | ||
*/ | ||
@SuppressWarnings("JavaDoc") | ||
@Builder.Default | ||
private final boolean migrate = true; | ||
private final boolean migrate; | ||
|
||
/** | ||
* @param serializer The serializer to use for {@link Invocation}s. See {@link | ||
* InvocationSerializer} for more information. Defaults to {@link | ||
* InvocationSerializer#createDefaultJsonSerializer()} with no custom serializable classes. | ||
*/ | ||
@SuppressWarnings("JavaDoc") | ||
@Builder.Default | ||
private final InvocationSerializer serializer = | ||
InvocationSerializer.createDefaultJsonSerializer(); | ||
private final InvocationSerializer serializer; | ||
|
||
// for backward compatibility | ||
protected DefaultPersistor(Dialect dialect) { | ||
this( | ||
DEFAULT_WRITE_LOCK_TIMEOUT_SECONDS, | ||
dialect, | ||
DefaultSequenceGenerator.builder().dialect(dialect).build(), | ||
DEFAULT_TABLE_NAME, | ||
true, | ||
InvocationSerializer.createDefaultJsonSerializer()); | ||
} | ||
|
||
public static DefaultPersistorBuilder builder() { | ||
return new DefaultPersistorBuilder(); | ||
} | ||
|
||
@Override | ||
public void validate(Validator validator) { | ||
validator.notNull("dialect", dialect); | ||
validator.notNull("tableName", tableName); | ||
validator.notNull("sequenceGenerator", sequenceGenerator); | ||
} | ||
|
||
@Override | ||
|
@@ -119,8 +137,12 @@ public void save(Transaction tx, TransactionOutboxEntry entry) | |
var writer = new StringWriter(); | ||
serializer.serializeInvocation(entry.getInvocation(), writer); | ||
if (entry.getTopic() != null) { | ||
setNextSequence(tx, entry); | ||
log.info("Assigned sequence number {} to topic {}", entry.getSequence(), entry.getTopic()); | ||
try { | ||
entry.setSequence(sequenceGenerator.generate(tx, entry.getTopic())); | ||
log.info("Assigned sequence number {} to topic {}", entry.getSequence(), entry.getTopic()); | ||
} catch (Exception e) { | ||
throw new RuntimeException("Failed to assign sequence number", e); | ||
} | ||
} | ||
PreparedStatement stmt = tx.prepareBatchStatement(insertSql); | ||
setupInsert(entry, writer, stmt); | ||
|
@@ -132,7 +154,7 @@ public void save(Transaction tx, TransactionOutboxEntry entry) | |
stmt.executeUpdate(); | ||
log.debug("Inserted {} immediately", entry.description()); | ||
} catch (Exception e) { | ||
if (indexViolation(e)) { | ||
if (Utils.indexViolation(e)) { | ||
throw new AlreadyScheduledException( | ||
"Request " + entry.description() + " already exists", e); | ||
} | ||
|
@@ -141,47 +163,6 @@ public void save(Transaction tx, TransactionOutboxEntry entry) | |
} | ||
} | ||
|
||
private void setNextSequence(Transaction tx, TransactionOutboxEntry entry) throws SQLException { | ||
//noinspection resource | ||
var seqSelect = tx.prepareBatchStatement(dialect.getFetchNextSequence()); | ||
seqSelect.setString(1, entry.getTopic()); | ||
try (ResultSet rs = seqSelect.executeQuery()) { | ||
if (rs.next()) { | ||
entry.setSequence(rs.getLong(1) + 1L); | ||
//noinspection resource | ||
var seqUpdate = | ||
tx.prepareBatchStatement("UPDATE TXNO_SEQUENCE SET seq = ? WHERE topic = ?"); | ||
seqUpdate.setLong(1, entry.getSequence()); | ||
seqUpdate.setString(2, entry.getTopic()); | ||
seqUpdate.executeUpdate(); | ||
} else { | ||
try { | ||
entry.setSequence(1L); | ||
//noinspection resource | ||
var seqInsert = | ||
tx.prepareBatchStatement("INSERT INTO TXNO_SEQUENCE (topic, seq) VALUES (?, ?)"); | ||
seqInsert.setString(1, entry.getTopic()); | ||
seqInsert.setLong(2, entry.getSequence()); | ||
seqInsert.executeUpdate(); | ||
} catch (Exception e) { | ||
if (indexViolation(e)) { | ||
setNextSequence(tx, entry); | ||
} else { | ||
throw e; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
private boolean indexViolation(Exception e) { | ||
return (e instanceof SQLIntegrityConstraintViolationException) | ||
|| (e.getClass().getName().equals("org.postgresql.util.PSQLException") | ||
&& e.getMessage().contains("constraint")) | ||
|| (e.getClass().getName().equals("com.microsoft.sqlserver.jdbc.SQLServerException") | ||
&& e.getMessage().contains("duplicate key")); | ||
} | ||
|
||
private void setupInsert( | ||
TransactionOutboxEntry entry, StringWriter writer, PreparedStatement stmt) | ||
throws SQLException { | ||
|
@@ -415,4 +396,58 @@ public boolean checkConnection(Transaction tx) throws SQLException { | |
return rs.next() && (rs.getInt(1) == 1); | ||
} | ||
} | ||
|
||
public static class DefaultPersistorBuilder { | ||
private Integer writeLockTimeoutSeconds; | ||
private Dialect dialect; | ||
private SequenceGenerator sequenceGenerator; | ||
private String tableName; | ||
private Boolean migrate; | ||
private InvocationSerializer serializer; | ||
|
||
DefaultPersistorBuilder() {} | ||
|
||
public DefaultPersistorBuilder writeLockTimeoutSeconds(int writeLockTimeoutSeconds) { | ||
this.writeLockTimeoutSeconds = writeLockTimeoutSeconds; | ||
return this; | ||
} | ||
|
||
public DefaultPersistorBuilder dialect(Dialect dialect) { | ||
this.dialect = dialect; | ||
return this; | ||
} | ||
|
||
public DefaultPersistorBuilder sequenceGenerator(SequenceGenerator sequenceGenerator) { | ||
this.sequenceGenerator = sequenceGenerator; | ||
return this; | ||
} | ||
|
||
public DefaultPersistorBuilder tableName(String tableName) { | ||
this.tableName = tableName; | ||
return this; | ||
} | ||
|
||
public DefaultPersistorBuilder migrate(boolean migrate) { | ||
this.migrate = migrate; | ||
return this; | ||
} | ||
|
||
public DefaultPersistorBuilder serializer(InvocationSerializer serializer) { | ||
this.serializer = serializer; | ||
return this; | ||
} | ||
|
||
public DefaultPersistor build() { | ||
|
||
return new DefaultPersistor( | ||
Objects.requireNonNullElse(writeLockTimeoutSeconds, 2), | ||
this.dialect, | ||
Objects.requireNonNullElseGet( | ||
sequenceGenerator, () -> DefaultSequenceGenerator.builder().dialect(dialect).build()), | ||
Objects.requireNonNullElse(tableName, DEFAULT_TABLE_NAME), | ||
Objects.requireNonNullElse(migrate, true), | ||
Objects.requireNonNullElseGet( | ||
serializer, InvocationSerializer::createDefaultJsonSerializer)); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package com.gruelbox.transactionoutbox; | ||
|
||
import com.gruelbox.transactionoutbox.spi.Utils; | ||
import java.sql.ResultSet; | ||
import lombok.AccessLevel; | ||
import lombok.Builder; | ||
import lombok.RequiredArgsConstructor; | ||
|
||
/** Generates a sequence number based on the <i>TXNO_SEQUENCE</i> table in a relational database. */ | ||
@Builder | ||
@RequiredArgsConstructor(access = AccessLevel.PROTECTED) | ||
public class DefaultSequenceGenerator implements SequenceGenerator, Validatable { | ||
private final Dialect dialect; | ||
|
||
@Override | ||
public long generate(Transaction tx, String topic) throws Exception { | ||
//noinspection resource | ||
var seqSelect = tx.prepareBatchStatement(dialect.getFetchNextSequence()); | ||
seqSelect.setString(1, topic); | ||
try (ResultSet rs = seqSelect.executeQuery()) { | ||
long sequence = 1L; | ||
if (rs.next()) { | ||
sequence = rs.getLong(1) + 1; | ||
//noinspection resource | ||
var seqUpdate = | ||
tx.prepareBatchStatement("UPDATE TXNO_SEQUENCE SET seq = ? WHERE topic = ?"); | ||
seqUpdate.setLong(1, sequence); | ||
seqUpdate.setString(2, topic); | ||
seqUpdate.executeUpdate(); | ||
} else { | ||
try { | ||
//noinspection resource | ||
var seqInsert = | ||
tx.prepareBatchStatement("INSERT INTO TXNO_SEQUENCE (topic, seq) VALUES (?, ?)"); | ||
seqInsert.setString(1, topic); | ||
seqInsert.setLong(2, sequence); | ||
seqInsert.executeUpdate(); | ||
} catch (Exception e) { | ||
if (Utils.indexViolation(e)) { | ||
return generate(tx, topic); | ||
} else { | ||
throw e; | ||
} | ||
} | ||
} | ||
|
||
return sequence; | ||
} | ||
} | ||
|
||
@Override | ||
public void validate(Validator validator) { | ||
validator.notNull("dialect", dialect); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,10 @@ public interface Persistor { | |
* @return The persistor. | ||
*/ | ||
static DefaultPersistor forDialect(Dialect dialect) { | ||
return DefaultPersistor.builder().dialect(dialect).build(); | ||
return DefaultPersistor.builder() | ||
.dialect(dialect) | ||
.sequenceGenerator(DefaultSequenceGenerator.builder().dialect(dialect).build()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This shouldn't be necessary (see comments on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
.build(); | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package com.gruelbox.transactionoutbox; | ||
|
||
/** | ||
* Generates sequences for a topic that is used in ordered tasks. For most use cases, just use | ||
* {@link DefaultSequenceGenerator}. | ||
*/ | ||
public interface SequenceGenerator { | ||
/** | ||
* Returns the sequence number for a topic | ||
* | ||
* @param tx The current {@link Transaction} | ||
* @param topic The topic. Can be considered as a key in your storage | ||
* @return The sequence number for a topic | ||
* @throws Exception Any exception | ||
*/ | ||
long generate(Transaction tx, String topic) throws Exception; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be necessary (see comments on
DefaultPersistor
)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed