Skip to content

feat: Add option for valid time range to delete expired messages #1614

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
using DotNetCore.CAP.Options;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -151,6 +152,10 @@ public Task<MediumMessage> StoreReceivedMessageAsync(string name, string group,
public Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000,
CancellationToken token = default)
{
if (_capOptions.Value.DeleteExpiredMessagesValidTimeRange is not null &&
!_capOptions.Value.DeleteExpiredMessagesValidTimeRange.CurrentTimeIsValid())
return Task.FromResult(0);

var removed = 0;
if (table == nameof(PublishedMessages))
{
Expand Down
5 changes: 5 additions & 0 deletions src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
using DotNetCore.CAP.Options;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -226,6 +227,10 @@ public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string g
public async Task<int> DeleteExpiresAsync(string collection, DateTime timeout, int batchCount = 1000,
CancellationToken cancellationToken = default)
{
if (_capOptions.Value.DeleteExpiredMessagesValidTimeRange is not null &&
!_capOptions.Value.DeleteExpiredMessagesValidTimeRange.CurrentTimeIsValid())
return 0;

if (collection == _options.Value.PublishedCollection)
{
var publishedCollection = _database.GetCollection<PublishedMessage>(_options.Value.PublishedCollection);
Expand Down
5 changes: 5 additions & 0 deletions src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
using DotNetCore.CAP.Options;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using Microsoft.EntityFrameworkCore.Storage;
Expand Down Expand Up @@ -207,6 +208,10 @@ public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string g
public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000,
CancellationToken token = default)
{
if (_capOptions.Value.DeleteExpiredMessagesValidTimeRange is not null &&
!_capOptions.Value.DeleteExpiredMessagesValidTimeRange.CurrentTimeIsValid())
return 0;

var connection = new MySqlConnection(_options.Value.ConnectionString);
await using var _ = connection.ConfigureAwait(false);
return await connection.ExecuteNonQueryAsync(
Expand Down
2 changes: 1 addition & 1 deletion src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class NATSOptions
/// <summary>
/// Used to setup all NATs client options
/// </summary>
public Options? Options { get; set; }
public global::NATS.Client.Options? Options { get; set; }

public Action<StreamConfiguration.StreamConfigurationBuilder>? StreamOptions { get; set; }

Expand Down
5 changes: 5 additions & 0 deletions src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
using DotNetCore.CAP.Options;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using Microsoft.EntityFrameworkCore.Storage;
Expand Down Expand Up @@ -205,6 +206,10 @@ public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string g
public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000,
CancellationToken token = default)
{
if (_capOptions.Value.DeleteExpiredMessagesValidTimeRange is not null &&
!_capOptions.Value.DeleteExpiredMessagesValidTimeRange.CurrentTimeIsValid())
return 0;

var connection = _options.Value.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
return await connection.ExecuteNonQueryAsync(
Expand Down
5 changes: 5 additions & 0 deletions src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Monitoring;
using DotNetCore.CAP.Options;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Serialization;
using Microsoft.Data.SqlClient;
Expand Down Expand Up @@ -205,6 +206,10 @@ public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string g
public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000,
CancellationToken token = default)
{
if (_capOptions.Value.DeleteExpiredMessagesValidTimeRange is not null &&
!_capOptions.Value.DeleteExpiredMessagesValidTimeRange.CurrentTimeIsValid())
return 0;

var connection = new SqlConnection(_options.Value.ConnectionString);
await using var _ = connection.ConfigureAwait(false);
return await connection.ExecuteNonQueryAsync(
Expand Down
6 changes: 6 additions & 0 deletions src/DotNetCore.CAP/CAP.Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Reflection;
using System.Text.Json;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Options;

// ReSharper disable InconsistentNaming

Expand Down Expand Up @@ -150,6 +151,11 @@ public CapOptions()
/// </summary>
public bool UseStorageLock { get; set; }

/// <summary>
/// If this value is set, workers will delete expired messages only within the specified time ranges.
/// </summary>
public DeleteExpiredMessagesValidTimeRange? DeleteExpiredMessagesValidTimeRange { get; set; }

/// <summary>
/// Registers an extension that will be executed when building services.
/// </summary>
Expand Down
38 changes: 38 additions & 0 deletions src/DotNetCore.CAP/Options/DeleteExpiredMessagesValidTimeRange.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;

namespace DotNetCore.CAP.Options
{
public class DeleteExpiredMessagesValidTimeRange
{
public DeleteExpiredMessagesValidTimeRange(TimeSpan fromHour, TimeSpan toHour)
{
FromHour = fromHour;
ToHour = toHour;

ThrowExceptionIfTimeRangeIsNotValid();
}

public TimeSpan FromHour { get; private set; }
public TimeSpan ToHour { get; private set; }

void ThrowExceptionIfTimeRangeIsNotValid()
{
if (FromHour >= ToHour)
throw new ArgumentException("FromHour must be less than ToHour.");
}
}

public static class DeleteExpiredMessagesValidTimeRangeExtension
{
/// <summary>
/// Checks if the current time of day falls within the valid range.
/// </summary>
/// <param name="timeRange">The time range to check against.</param>
/// <returns>Returns true if the current time is within the valid range; otherwise, false.</returns>
public static bool CurrentTimeIsValid(this DeleteExpiredMessagesValidTimeRange timeRange)
{
var now = DateTime.Now.TimeOfDay;
return now >= timeRange.FromHour && now <= timeRange.ToHour;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public ServiceBusTransportTests()

config.ConfigureCustomProducer<EntityCreated>(cfg => cfg.UseTopic("entity-created").WithSubscription());

_options = Options.Create(config);
_options = Microsoft.Extensions.Options.Options.Create(config);
}

[Fact]
Expand Down