Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outbox Improvements for Relation Database #3464

Merged
merged 9 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions samples/WebAPI/WebAPI_Common/DbMaker/SchemaCreation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -388,14 +388,14 @@ private static void CreateOutboxPostgres(string? connectionString, bool hasBinar
sqlConnection.Open();

using NpgsqlCommand existsQuery = sqlConnection.CreateCommand();
existsQuery.CommandText = PostgreSqlOutboxBulder.GetExistsQuery(OUTBOX_TABLE_NAME);
existsQuery.CommandText = PostgreSqlOutboxBuilder.GetExistsQuery(OUTBOX_TABLE_NAME);
object? findOutbox = existsQuery.ExecuteScalar();
bool exists = findOutbox is long and > 0;

if (exists) return;

using NpgsqlCommand command = sqlConnection.CreateCommand();
command.CommandText = PostgreSqlOutboxBulder.GetDDL(OUTBOX_TABLE_NAME, hasBinaryPayload);
command.CommandText = PostgreSqlOutboxBuilder.GetDDL(OUTBOX_TABLE_NAME, hasBinaryPayload);
command.ExecuteScalar();
}

Expand Down
163 changes: 95 additions & 68 deletions src/Paramore.Brighter.Outbox.MsSql/MsSqlOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ public class MsSqlOutbox : RelationDatabaseOutbox
private const int MsSqlDuplicateKeyError_UniqueConstraintViolation = 2627;
private readonly IAmARelationalDatabaseConfiguration _configuration;
private readonly IAmARelationalDbConnectionProvider _connectionProvider;

/// <summary>
/// Initializes a new instance of the <see cref="MsSqlOutbox" /> class.
/// </summary>
/// <param name="configuration">The configuration.</param>
/// <param name="connectionProvider">The connection factory.</param>
public MsSqlOutbox(IAmARelationalDatabaseConfiguration configuration, IAmARelationalDbConnectionProvider connectionProvider) : base(
public MsSqlOutbox(IAmARelationalDatabaseConfiguration configuration,
IAmARelationalDbConnectionProvider connectionProvider) : base(
configuration.OutBoxTableName, new MsSqlQueries(), ApplicationLogging.CreateLogger<MsSqlOutbox>())
{
_configuration = configuration;
Expand All @@ -70,7 +71,7 @@ public MsSqlOutbox(IAmARelationalDatabaseConfiguration configuration) : this(con

protected override void WriteToStore(
IAmABoxTransactionProvider<DbTransaction> transactionProvider,
Func<DbConnection, DbCommand> commandFunc,
Func<DbConnection, DbCommand> commandFunc,
Action loggingAction)
{
var connection = GetOpenConnection(_connectionProvider, transactionProvider);
Expand All @@ -86,7 +87,6 @@ protected override void WriteToStore(
if (sqlException.Number != MsSqlDuplicateKeyError_UniqueIndexViolation &&
sqlException.Number != MsSqlDuplicateKeyError_UniqueConstraintViolation) throw;
loggingAction.Invoke();

}
finally
{
Expand All @@ -96,8 +96,8 @@ protected override void WriteToStore(

protected override async Task WriteToStoreAsync(
IAmABoxTransactionProvider<DbTransaction> transactionProvider,
Func<DbConnection, DbCommand> commandFunc,
Action loggingAction,
Func<DbConnection, DbCommand> commandFunc,
Action loggingAction,
CancellationToken cancellationToken)
{
var connection = await GetOpenConnectionAsync(_connectionProvider, transactionProvider, cancellationToken);
Expand Down Expand Up @@ -128,7 +128,7 @@ protected override async Task WriteToStoreAsync(
protected override T ReadFromStore<T>(
Func<DbConnection, DbCommand> commandFunc,
Func<DbDataReader, T> resultFunc
)
)
{
var connection = _connectionProvider.GetConnection();

Expand All @@ -149,7 +149,7 @@ protected override async Task<T> ReadFromStoreAsync<T>(
Func<DbConnection, DbCommand> commandFunc,
Func<DbDataReader, Task<T>> resultFunc,
CancellationToken cancellationToken
)
)
{
var connection = await _connectionProvider.GetConnectionAsync(cancellationToken);

Expand All @@ -167,11 +167,11 @@ CancellationToken cancellationToken
}

protected override DbCommand CreateCommand(
DbConnection connection,
string sqlText,
DbConnection connection,
string sqlText,
int outBoxTimeout,
params IDbDataParameter[] parameters
)
)
{
var command = connection.CreateCommand();

Expand All @@ -182,18 +182,32 @@ params IDbDataParameter[] parameters
return command;
}

protected override IDbDataParameter[] CreatePagedOutstandingParameters(double milliSecondsSinceAdded, int pageSize,
protected override IDbDataParameter[] CreatePagedOutstandingParameters(TimeSpan since, int pageSize,
int pageNumber)
{
var parameters = new IDbDataParameter[3];
parameters[0] =
new SqlParameter { ParameterName = "PageNumber", Value = (object)pageNumber ?? DBNull.Value };
parameters[1] = new SqlParameter { ParameterName = "PageSize", Value = (object)pageSize ?? DBNull.Value };
parameters[2] = new SqlParameter
parameters[0] = new SqlParameter { ParameterName = "PageNumber", Value = pageNumber };
parameters[1] = new SqlParameter { ParameterName = "PageSize", Value = pageSize };
parameters[2] = CreateSqlParameter("DispatchedSince", DateTimeOffset.UtcNow.Subtract(since));
return parameters;
}

protected override IDbDataParameter[] CreatePagedDispatchedParameters(TimeSpan dispatchedSince, int pageSize,
int pageNumber)
{
ParameterName = "OutstandingSince", Value = (object)milliSecondsSinceAdded ?? DBNull.Value
};
var parameters = new IDbDataParameter[3];
parameters[0] = new SqlParameter { ParameterName = "PageNumber", Value = pageNumber };
parameters[1] = new SqlParameter { ParameterName = "PageSize", Value = pageSize };
parameters[2] = CreateSqlParameter("DispatchedSince", DateTimeOffset.UtcNow.Subtract(dispatchedSince));

return parameters;
}

protected override IDbDataParameter[] CreatePagedReadParameters(int pageSize, int pageNumber)
{
var parameters = new IDbDataParameter[2];
parameters[0] = new SqlParameter { ParameterName = "PageNumber", Value = pageNumber };
parameters[1] = new SqlParameter { ParameterName = "PageSize", Value = pageSize };
return parameters;
}

Expand All @@ -203,7 +217,7 @@ protected override IDbDataParameter CreateSqlParameter(string parameterName, obj
{
return new SqlParameter { ParameterName = parameterName, Value = value ?? DBNull.Value };
}

protected override IDbDataParameter[] InitAddDbParameters(Message message, int? position = null)
{
var prefix = position.HasValue ? $"p{position}_" : "";
Expand All @@ -212,7 +226,7 @@ protected override IDbDataParameter[] InitAddDbParameters(Message message, int?
{
new SqlParameter
{
ParameterName = $"{prefix}MessageId",
ParameterName = $"{prefix}MessageId",
DbType = DbType.String,
Value = (object)message.Id ?? DBNull.Value
},
Expand All @@ -224,7 +238,7 @@ protected override IDbDataParameter[] InitAddDbParameters(Message message, int?
},
new SqlParameter
{
ParameterName = $"{prefix}Topic",
ParameterName = $"{prefix}Topic",
DbType = DbType.String,
Value = (object)message.Header.Topic.Value ?? DBNull.Value
},
Expand All @@ -242,7 +256,7 @@ protected override IDbDataParameter[] InitAddDbParameters(Message message, int?
},
new SqlParameter
{
ParameterName = $"{prefix}ReplyTo",
ParameterName = $"{prefix}ReplyTo",
DbType = DbType.String,
Value = (object)message.Header.ReplyTo ?? DBNull.Value
},
Expand All @@ -258,24 +272,23 @@ protected override IDbDataParameter[] InitAddDbParameters(Message message, int?
DbType = DbType.String,
Value = (object)message.Header.PartitionKey ?? DBNull.Value
},
new SqlParameter { ParameterName = $"{prefix}HeaderBag",
Value = (object)bagJson ?? DBNull.Value },
new SqlParameter { ParameterName = $"{prefix}HeaderBag", Value = (object)bagJson ?? DBNull.Value },
_configuration.BinaryMessagePayload
? new SqlParameter
{
ParameterName = $"{prefix}Body",
ParameterName = $"{prefix}Body",
DbType = DbType.Binary,
Value = (object)message.Body?.Bytes ?? DBNull.Value
}
: new SqlParameter
{
ParameterName = $"{prefix}Body",
ParameterName = $"{prefix}Body",
DbType = DbType.String,
Value = (object)message.Body?.Value ?? DBNull.Value
}
};
}

#endregion

#region Property Extractors
Expand All @@ -287,22 +300,22 @@ private static MessageType GetMessageType(DbDataReader dr) =>

private static string GetMessageId(DbDataReader dr) => dr.GetString(dr.GetOrdinal("MessageId"));

private string GetContentType(DbDataReader dr)
private static string GetContentType(DbDataReader dr)
{
var ordinal = dr.GetOrdinal("ContentType");
if (dr.IsDBNull(ordinal)) return null;
if (dr.IsDBNull(ordinal)) return null;

var contentType = dr.GetString(ordinal);
return contentType;
}

private string GetReplyTo(DbDataReader dr)
private static string GetReplyTo(DbDataReader dr)
{
var ordinal = dr.GetOrdinal("ReplyTo");
if (dr.IsDBNull(ordinal)) return null;
var replyTo = dr.GetString(ordinal);
return replyTo;
var ordinal = dr.GetOrdinal("ReplyTo");
if (dr.IsDBNull(ordinal)) return null;

var replyTo = dr.GetString(ordinal);
return replyTo;
}

private static Dictionary<string, object> GetContextBag(DbDataReader dr)
Expand All @@ -314,11 +327,11 @@ private static Dictionary<string, object> GetContextBag(DbDataReader dr)
return dictionaryBag;
}

private string GetCorrelationId(DbDataReader dr)
private static string GetCorrelationId(DbDataReader dr)
{
var ordinal = dr.GetOrdinal("CorrelationId");
if (dr.IsDBNull(ordinal)) return null;
if (dr.IsDBNull(ordinal)) return null;

var correlationId = dr.GetString(ordinal);
return correlationId;
}
Expand All @@ -332,7 +345,7 @@ private static DateTimeOffset GetTimeStamp(DbDataReader dr)
return timeStamp;
}

private string GetPartitionKey(DbDataReader dr)
private static string GetPartitionKey(DbDataReader dr)
{
var ordinal = dr.GetOrdinal("PartitionKey");
if (dr.IsDBNull(ordinal)) return null;
Expand All @@ -341,7 +354,7 @@ private string GetPartitionKey(DbDataReader dr)
return partitionKey;
}

private byte[] GetBodyAsBytes(DbDataReader dr)
private static byte[] GetBodyAsBytes(DbDataReader dr)
{
var ordinal = dr.GetOrdinal("Body");
if (dr.IsDBNull(ordinal)) return null;
Expand All @@ -355,10 +368,10 @@ private byte[] GetBodyAsBytes(DbDataReader dr)
var bytesRead = body.Read(buffer, 0, (int)bodyLength);
bytesRemaining -= bytesRead;
}

return buffer;
}

private static string GetBodyAsText(DbDataReader dr)
{
var ordinal = dr.GetOrdinal("Body");
Expand All @@ -381,7 +394,7 @@ protected override Message MapFunction(DbDataReader dr)

return message ?? new Message();
}

protected override async Task<Message> MapFunctionAsync(DbDataReader dr, CancellationToken cancellationToken)
{
Message message = null;
Expand All @@ -390,7 +403,7 @@ protected override async Task<Message> MapFunctionAsync(DbDataReader dr, Cancell
message = MapAMessage(dr);
}

dr.Close();
await dr.CloseAsync();

return message ?? new Message();
}
Expand All @@ -411,26 +424,40 @@ protected override IEnumerable<Message> MapListFunction(DbDataReader dr)
protected override async Task<IEnumerable<Message>> MapListFunctionAsync(
DbDataReader dr,
CancellationToken cancellationToken
)
)
{
var messages = new List<Message>();
while (await dr.ReadAsync(cancellationToken))
{
messages.Add(MapAMessage(dr));
}

dr.Close();
await dr.CloseAsync();

return messages;
}

protected override async Task<int> MapOutstandingCountAsync(DbDataReader dr, CancellationToken cancellationToken)
protected override async Task<int> MapOutstandingCountAsync(DbDataReader dr,
CancellationToken cancellationToken)
{
int outstandingMessages = -1;
if (await dr.ReadAsync(cancellationToken))
{
outstandingMessages = dr.GetInt32(0);
}

await dr.CloseAsync();
return outstandingMessages;
}

protected override int MapOutstandingCount(DbDataReader dr)
{
int outstandingMessages = -1;
if(await dr.ReadAsync(cancellationToken))
if (dr.Read())
{
outstandingMessages = dr.GetInt32(0);
}

dr.Close();
return outstandingMessages;
}
Expand All @@ -445,32 +472,32 @@ private Message MapAMessage(DbDataReader dr)

var header = new MessageHeader(id, topic, messageType);

DateTimeOffset timeStamp = GetTimeStamp(dr);
var correlationId = GetCorrelationId(dr);
var replyTo = GetReplyTo(dr);
var contentType = GetContentType(dr);
DateTimeOffset timeStamp = GetTimeStamp(dr);
var correlationId = GetCorrelationId(dr);
var replyTo = GetReplyTo(dr);
var contentType = GetContentType(dr);
var partitionKey = GetPartitionKey(dr);

header = new MessageHeader(
messageId: id,
topic: topic,
messageType: messageType,
timeStamp: timeStamp,
handledCount: 0,
delayed: TimeSpan.Zero,
correlationId: correlationId,
replyTo: new RoutingKey(replyTo),
header = new MessageHeader(
messageId: id,
topic: topic,
messageType: messageType,
timeStamp: timeStamp,
handledCount: 0,
delayed: TimeSpan.Zero,
correlationId: correlationId,
replyTo: new RoutingKey(replyTo),
contentType: contentType,
partitionKey: partitionKey);

Dictionary<string, object> dictionaryBag = GetContextBag(dr);
if (dictionaryBag != null)
Dictionary<string, object> dictionaryBag = GetContextBag(dr);
if (dictionaryBag != null)
{
foreach (var key in dictionaryBag.Keys)
{
foreach (var key in dictionaryBag.Keys)
{
header.Bag.Add(key, dictionaryBag[key]);
}
header.Bag.Add(key, dictionaryBag[key]);
}
}

var bodyOrdinal = dr.GetOrdinal("Body");
string messageBody = string.Empty;
Expand Down
Loading
Loading