diff --git a/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs index 93e040d8a..083c19e07 100644 --- a/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs +++ b/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs @@ -10,6 +10,7 @@ using DotNetCore.CAP.Internal; using DotNetCore.CAP.Messages; using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Options; using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Serialization; using Microsoft.Extensions.Options; @@ -151,6 +152,10 @@ public Task StoreReceivedMessageAsync(string name, string group, public Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) { + if (_capOptions.Value.DeleteExpiredMessagesValidTimeRange is not null && + !_capOptions.Value.DeleteExpiredMessagesValidTimeRange.CurrentTimeIsValid()) + return Task.FromResult(0); + var removed = 0; if (table == nameof(PublishedMessages)) { diff --git a/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs index 4180a167a..f5b75daa8 100644 --- a/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs +++ b/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs @@ -9,6 +9,7 @@ using DotNetCore.CAP.Internal; using DotNetCore.CAP.Messages; using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Options; using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Serialization; using Microsoft.Extensions.Options; @@ -226,6 +227,10 @@ public async Task StoreReceivedMessageAsync(string name, string g public async Task DeleteExpiresAsync(string collection, DateTime timeout, int batchCount = 1000, CancellationToken cancellationToken = default) { + if (_capOptions.Value.DeleteExpiredMessagesValidTimeRange is not null && + !_capOptions.Value.DeleteExpiredMessagesValidTimeRange.CurrentTimeIsValid()) + return 0; + if (collection == _options.Value.PublishedCollection) { var publishedCollection = _database.GetCollection(_options.Value.PublishedCollection); diff --git a/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs b/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs index 19d63ae27..92cf684bc 100644 --- a/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs +++ b/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs @@ -10,6 +10,7 @@ using DotNetCore.CAP.Internal; using DotNetCore.CAP.Messages; using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Options; using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Serialization; using Microsoft.EntityFrameworkCore.Storage; @@ -207,6 +208,10 @@ public async Task StoreReceivedMessageAsync(string name, string g public async Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) { + if (_capOptions.Value.DeleteExpiredMessagesValidTimeRange is not null && + !_capOptions.Value.DeleteExpiredMessagesValidTimeRange.CurrentTimeIsValid()) + return 0; + var connection = new MySqlConnection(_options.Value.ConnectionString); await using var _ = connection.ConfigureAwait(false); return await connection.ExecuteNonQueryAsync( diff --git a/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs b/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs index c2651d202..d967c464f 100644 --- a/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs +++ b/src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs @@ -33,7 +33,7 @@ public class NATSOptions /// /// Used to setup all NATs client options /// - public Options? Options { get; set; } + public global::NATS.Client.Options? Options { get; set; } public Action? StreamOptions { get; set; } diff --git a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs index b157aebdd..84b54afb7 100644 --- a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs @@ -9,6 +9,7 @@ using DotNetCore.CAP.Internal; using DotNetCore.CAP.Messages; using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Options; using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Serialization; using Microsoft.EntityFrameworkCore.Storage; @@ -205,6 +206,10 @@ public async Task StoreReceivedMessageAsync(string name, string g public async Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) { + if (_capOptions.Value.DeleteExpiredMessagesValidTimeRange is not null && + !_capOptions.Value.DeleteExpiredMessagesValidTimeRange.CurrentTimeIsValid()) + return 0; + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); return await connection.ExecuteNonQueryAsync( diff --git a/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs index f775709b6..46a06af6e 100644 --- a/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs @@ -10,6 +10,7 @@ using DotNetCore.CAP.Internal; using DotNetCore.CAP.Messages; using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Options; using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Serialization; using Microsoft.Data.SqlClient; @@ -205,6 +206,10 @@ public async Task StoreReceivedMessageAsync(string name, string g public async Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) { + if (_capOptions.Value.DeleteExpiredMessagesValidTimeRange is not null && + !_capOptions.Value.DeleteExpiredMessagesValidTimeRange.CurrentTimeIsValid()) + return 0; + var connection = new SqlConnection(_options.Value.ConnectionString); await using var _ = connection.ConfigureAwait(false); return await connection.ExecuteNonQueryAsync( diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 4a3106cdd..cd6b1844a 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -6,6 +6,7 @@ using System.Reflection; using System.Text.Json; using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Options; // ReSharper disable InconsistentNaming @@ -150,6 +151,11 @@ public CapOptions() /// public bool UseStorageLock { get; set; } + /// + /// If this value is set, workers will delete expired messages only within the specified time ranges. + /// + public DeleteExpiredMessagesValidTimeRange? DeleteExpiredMessagesValidTimeRange { get; set; } + /// /// Registers an extension that will be executed when building services. /// diff --git a/src/DotNetCore.CAP/Options/DeleteExpiredMessagesValidTimeRange.cs b/src/DotNetCore.CAP/Options/DeleteExpiredMessagesValidTimeRange.cs new file mode 100644 index 000000000..59f47f162 --- /dev/null +++ b/src/DotNetCore.CAP/Options/DeleteExpiredMessagesValidTimeRange.cs @@ -0,0 +1,38 @@ +using System; + +namespace DotNetCore.CAP.Options +{ + public class DeleteExpiredMessagesValidTimeRange + { + public DeleteExpiredMessagesValidTimeRange(TimeSpan fromHour, TimeSpan toHour) + { + FromHour = fromHour; + ToHour = toHour; + + ThrowExceptionIfTimeRangeIsNotValid(); + } + + public TimeSpan FromHour { get; private set; } + public TimeSpan ToHour { get; private set; } + + void ThrowExceptionIfTimeRangeIsNotValid() + { + if (FromHour >= ToHour) + throw new ArgumentException("FromHour must be less than ToHour."); + } + } + + public static class DeleteExpiredMessagesValidTimeRangeExtension + { + /// + /// Checks if the current time of day falls within the valid range. + /// + /// The time range to check against. + /// Returns true if the current time is within the valid range; otherwise, false. + public static bool CurrentTimeIsValid(this DeleteExpiredMessagesValidTimeRange timeRange) + { + var now = DateTime.Now.TimeOfDay; + return now >= timeRange.FromHour && now <= timeRange.ToHour; + } + } +} diff --git a/test/DotNetCore.CAP.AzureServiceBus.Test/ServiceBusTransportTests.cs b/test/DotNetCore.CAP.AzureServiceBus.Test/ServiceBusTransportTests.cs index 692b1b810..534250964 100644 --- a/test/DotNetCore.CAP.AzureServiceBus.Test/ServiceBusTransportTests.cs +++ b/test/DotNetCore.CAP.AzureServiceBus.Test/ServiceBusTransportTests.cs @@ -24,7 +24,7 @@ public ServiceBusTransportTests() config.ConfigureCustomProducer(cfg => cfg.UseTopic("entity-created").WithSubscription()); - _options = Options.Create(config); + _options = Microsoft.Extensions.Options.Options.Create(config); } [Fact]