diff --git a/EventFlow.sln b/EventFlow.sln index dd72add62..15ce5edb9 100644 --- a/EventFlow.sln +++ b/EventFlow.sln @@ -57,6 +57,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow.Hangfire", "Sourc EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow.Hangfire.Tests", "Source\EventFlow.Hangfire.Tests\EventFlow.Hangfire.Tests.csproj", "{B4247230-5289-4D17-BB0F-CB2FDBABA988}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "EventStore", "EventStore", "{86B45D9A-8929-4CD2-A471-26169D98ABBA}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow.EventStores.EventStore", "Source\EventFlow.EventStores.EventStore\EventFlow.EventStores.EventStore.csproj", "{06CD8016-7E55-43A1-A4F1-89C830810FE9}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -131,6 +135,10 @@ Global {B4247230-5289-4D17-BB0F-CB2FDBABA988}.Debug|Any CPU.Build.0 = Debug|Any CPU {B4247230-5289-4D17-BB0F-CB2FDBABA988}.Release|Any CPU.ActiveCfg = Release|Any CPU {B4247230-5289-4D17-BB0F-CB2FDBABA988}.Release|Any CPU.Build.0 = Release|Any CPU + {06CD8016-7E55-43A1-A4F1-89C830810FE9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {06CD8016-7E55-43A1-A4F1-89C830810FE9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {06CD8016-7E55-43A1-A4F1-89C830810FE9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {06CD8016-7E55-43A1-A4F1-89C830810FE9}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -158,6 +166,8 @@ Global {2E9CFB92-E8E5-4466-8410-CCB5BA5CB6D1} = {5EE323DE-E69B-451A-8AC3-22DD6A004FBA} {8BC99846-DDDB-40E7-B062-308BF8A1239F} = {2E9CFB92-E8E5-4466-8410-CCB5BA5CB6D1} {B4247230-5289-4D17-BB0F-CB2FDBABA988} = {2E9CFB92-E8E5-4466-8410-CCB5BA5CB6D1} + {86B45D9A-8929-4CD2-A471-26169D98ABBA} = {5EE323DE-E69B-451A-8AC3-22DD6A004FBA} + {06CD8016-7E55-43A1-A4F1-89C830810FE9} = {86B45D9A-8929-4CD2-A471-26169D98ABBA} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {17607E2C-4E8E-45A2-85BD-0A5808E1C0F3} diff --git a/Source/Directory.Build.props b/Source/Directory.Build.props index e1d0c2943..2c61e1761 100644 --- a/Source/Directory.Build.props +++ b/Source/Directory.Build.props @@ -4,7 +4,7 @@ false True - 1701;1702;1705;1591 + 1701;1702;1705;1591;NU1901;NU1902;NU1903;NU1904 false $(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb diff --git a/Source/EventFlow.EventStores.EventStore/EventFlow.EventStores.EventStore.csproj b/Source/EventFlow.EventStores.EventStore/EventFlow.EventStores.EventStore.csproj index 8dff30985..d0550a3ef 100644 --- a/Source/EventFlow.EventStores.EventStore/EventFlow.EventStores.EventStore.csproj +++ b/Source/EventFlow.EventStores.EventStore/EventFlow.EventStores.EventStore.csproj @@ -1,7 +1,6 @@  - - netstandard2.0 + net8.0 True True False @@ -19,12 +18,16 @@ en-US UPDATED BY BUILD + + + + - + - + diff --git a/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs b/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs index d2ba59348..0010a62c8 100644 --- a/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs +++ b/Source/EventFlow.EventStores.EventStore/EventStoreEventPersistence.cs @@ -1,4 +1,4 @@ -// The MIT License (MIT) +// The MIT License (MIT) // // Copyright (c) 2015-2024 Rasmus Mikkelsen // https://github.com/eventflow/EventFlow @@ -20,186 +20,196 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using EventFlow.Aggregates; +using EventFlow.Core; +using EventFlow.Exceptions; +using EventStore.Client; +using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; -using EventFlow.Aggregates; -using EventFlow.Core; -using EventFlow.Exceptions; -using EventFlow.Logs; -using EventStore.ClientAPI; -using EventStore.ClientAPI.Exceptions; -namespace EventFlow.EventStores.EventStore +namespace EventFlow.EventStores.EventStore; + +public class EventStoreEventPersistence(EventStoreClient eventStoreClient, ILogger logger) : IEventPersistence { - public class EventStoreEventPersistence : IEventPersistence + private class EventStoreEvent : ICommittedDomainEvent { - private readonly ILog _log; - private readonly IEventStoreConnection _connection; + public required string AggregateId { get; set; } + public required string Data { get; set; } + public required string Metadata { get; set; } + public required int AggregateSequenceNumber { get; set; } + } - private class EventStoreEvent : ICommittedDomainEvent + public async Task> CommitEventsAsync(IIdentity id, IReadOnlyCollection serializedEvents, CancellationToken cancellationToken) + { + var committedDomainEvents = serializedEvents + .Select(e => new EventStoreEvent + { + AggregateSequenceNumber = e.AggregateSequenceNumber, + Metadata = e.SerializedMetadata, + AggregateId = id.Value, + Data = e.SerializedData + }) + .ToList(); + + //var expectedVersion = Math.Max(serializedEvents.Min(e => e.AggregateSequenceNumber) - 2, ExpectedVersion.NoStream); + var expectedVersion = serializedEvents.Min(e => e.AggregateSequenceNumber) - 2; + + var eventDatas = serializedEvents + .Select(e => + { + // While it might be tempting to use e.Metadata.EventId here, we can't + // as EventStore won't detect optimistic concurrency exceptions then + var uuid = Uuid.NewUuid(); + + var eventType = string.Format("{0}.{1}.{2}", e.Metadata[MetadataKeys.AggregateName], e.Metadata.EventName, e.Metadata.EventVersion); + var data = Encoding.UTF8.GetBytes(e.SerializedData); + var meta = Encoding.UTF8.GetBytes(e.SerializedMetadata); + return new EventData(uuid, eventType, data, meta); + }) + .ToList(); + + try { - public string AggregateId { get; set; } - public string Data { get; set; } - public string Metadata { get; set; } - public int AggregateSequenceNumber { get; set; } - } + IWriteResult writeResult; + if (expectedVersion < 0) + { + writeResult = await eventStoreClient.AppendToStreamAsync( + id.Value, + StreamState.NoStream, + eventDatas, + cancellationToken: cancellationToken).ConfigureAwait(false); + } + else + { + writeResult = await eventStoreClient.AppendToStreamAsync( + id.Value, + StreamRevision.FromInt64(expectedVersion), + eventDatas, + cancellationToken: cancellationToken).ConfigureAwait(false); + } - public EventStoreEventPersistence( - ILog log, - IEventStoreConnection connection) - { - _log = log; - _connection = connection; + logger.LogDebug( + "Wrote entity {0} with version {1} ({2},{3})", + id, + expectedVersion, + writeResult.LogPosition.CommitPosition, + writeResult.LogPosition.PreparePosition); } - - public async Task LoadAllCommittedEvents( - GlobalPosition globalPosition, - int pageSize, - CancellationToken cancellationToken) + catch (WrongExpectedVersionException e) { - var nextPosition = ParsePosition(globalPosition); - var resolvedEvents = new List(); - AllEventsSlice allEventsSlice; - - do - { - allEventsSlice = await _connection.ReadAllEventsForwardAsync(nextPosition, pageSize, false).ConfigureAwait(false); - resolvedEvents.AddRange(allEventsSlice.Events.Where(e => !e.OriginalStreamId.StartsWith("$"))); - nextPosition = allEventsSlice.NextPosition; + throw new OptimisticConcurrencyException(e.Message, e); + } - } - while (resolvedEvents.Count < pageSize && !allEventsSlice.IsEndOfStream); + return committedDomainEvents; + } - var eventStoreEvents = Map(resolvedEvents); + public async Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken) + { + await eventStoreClient.TombstoneAsync(id.Value, StreamState.Any, cancellationToken: cancellationToken); + } - return new AllCommittedEventsPage( - new GlobalPosition(string.Format("{0}-{1}", nextPosition.CommitPosition, nextPosition.PreparePosition)), - eventStoreEvents); - } + public async Task LoadAllCommittedEvents(GlobalPosition globalPosition, int pageSize, CancellationToken cancellationToken) + { + var nextPosition = ParsePosition(globalPosition); + var resolvedEvents = new List(); - private static Position ParsePosition(GlobalPosition globalPosition) + await foreach (var resolvedEvent in eventStoreClient.ReadAllAsync( + Direction.Forwards, nextPosition, pageSize, cancellationToken: cancellationToken)) { - if (globalPosition.IsStart) + if (!resolvedEvent.OriginalStreamId.StartsWith("$")) { - return Position.Start; + resolvedEvents.Add(resolvedEvent); } + } - var parts = globalPosition.Value.Split('-'); - if (parts.Length != 2) - { - throw new ArgumentException(string.Format( - "Unknown structure for global position '{0}'. Expected it to be empty or in the form 'L-L'", - globalPosition.Value)); - } + var eventStoreEvents = Map(resolvedEvents); - var commitPosition = long.Parse(parts[0]); - var preparePosition = long.Parse(parts[1]); + return new AllCommittedEventsPage( + new GlobalPosition($"{resolvedEvents.Last().Event.Position.CommitPosition}-{resolvedEvents.Last().Event.Position.PreparePosition}"), + eventStoreEvents); + } - return new Position(commitPosition, preparePosition); - } + public async Task> LoadCommittedEventsAsync(IIdentity id, int fromEventSequenceNumber, CancellationToken cancellationToken) + { + return await LoadCommittedEventsAsync(id, fromEventSequenceNumber, 0, cancellationToken); + } + + public async Task> LoadCommittedEventsAsync(IIdentity id, int fromEventSequenceNumber, int toEventSequenceNumber, CancellationToken cancellationToken) + { + var resolvedEvents = new List(); - public async Task> CommitEventsAsync( - IIdentity id, - IReadOnlyCollection serializedEvents, - CancellationToken cancellationToken) + var startPosition = fromEventSequenceNumber <= 1 + ? StreamPosition.Start + : StreamPosition.FromInt64(fromEventSequenceNumber - 1); + + try { - var committedDomainEvents = serializedEvents - .Select(e => new EventStoreEvent - { - AggregateSequenceNumber = e.AggregateSequenceNumber, - Metadata = e.SerializedMetadata, - AggregateId = id.Value, - Data = e.SerializedData - }) - .ToList(); - - var expectedVersion = Math.Max(serializedEvents.Min(e => e.AggregateSequenceNumber) - 2, ExpectedVersion.NoStream); - var eventDatas = serializedEvents - .Select(e => - { - // While it might be tempting to use e.Metadata.EventId here, we can't - // as EventStore won't detect optimistic concurrency exceptions then - var guid = Guid.NewGuid(); - - var eventType = string.Format("{0}.{1}.{2}", e.Metadata[MetadataKeys.AggregateName], e.Metadata.EventName, e.Metadata.EventVersion); - var data = Encoding.UTF8.GetBytes(e.SerializedData); - var meta = Encoding.UTF8.GetBytes(e.SerializedMetadata); - return new EventData(guid, eventType, true, data, meta); - }) - .ToList(); - - try + ReadState result = await eventStoreClient.ReadStreamAsync( + Direction.Forwards, + id.Value, + startPosition, + cancellationToken: cancellationToken + ).ReadState; + + if (result == ReadState.StreamNotFound) { - var writeResult = await _connection.AppendToStreamAsync( - id.Value, - expectedVersion, - eventDatas) - .ConfigureAwait(false); - - _log.Verbose( - "Wrote entity {0} with version {1} ({2},{3})", - id, - writeResult.NextExpectedVersion - 1, - writeResult.LogPosition.CommitPosition, - writeResult.LogPosition.PreparePosition); + // No events, the stream doesn't exist i.e. IsNew + return []; } - catch (WrongExpectedVersionException e) + + await foreach (var resolvedEvent in eventStoreClient.ReadStreamAsync( + Direction.Forwards, + id.Value, + startPosition, + cancellationToken: cancellationToken).WithCancellation(cancellationToken)) { - throw new OptimisticConcurrencyException(e.Message, e); + resolvedEvents.Add(resolvedEvent); } - - return committedDomainEvents; } - - public async Task> LoadCommittedEventsAsync( - IIdentity id, - int fromEventSequenceNumber, - CancellationToken cancellationToken) + catch (Exception ex) { - var streamEvents = new List(); - - StreamEventsSlice currentSlice; - var nextSliceStart = fromEventSequenceNumber <= 1 - ? StreamPosition.Start - : fromEventSequenceNumber - 1; // Starts from zero - - do - { - currentSlice = await _connection.ReadStreamEventsForwardAsync( - id.Value, - nextSliceStart, - 200, - false) - .ConfigureAwait(false); - nextSliceStart = (int)currentSlice.NextEventNumber; - streamEvents.AddRange(currentSlice.Events); - - } - while (!currentSlice.IsEndOfStream); - - return Map(streamEvents); + throw new Exception($"Failed to load committed events for {id.Value}", ex); } - public Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken) + return Map(resolvedEvents); + } + + private static Position ParsePosition(GlobalPosition globalPosition) + { + if (globalPosition.IsStart) { - return _connection.DeleteStreamAsync(id.Value, ExpectedVersion.Any); + return Position.Start; } - private static IReadOnlyCollection Map(IEnumerable resolvedEvents) + var parts = globalPosition.Value.Split('-'); + if (parts.Length != 2) { - return resolvedEvents - .Select(e => new EventStoreEvent - { - AggregateSequenceNumber = (int)(e.Event.EventNumber + 1), // Starts from zero - Metadata = Encoding.UTF8.GetString(e.Event.Metadata), - AggregateId = e.OriginalStreamId, - Data = Encoding.UTF8.GetString(e.Event.Data), - }) - .ToList(); + throw new ArgumentException(string.Format( + "Unknown structure for global position '{0}'. Expected it to be empty or in the form 'L-L'", + globalPosition.Value)); } + + var commitPosition = ulong.Parse(parts[0]); + var preparePosition = ulong.Parse(parts[1]); + + return new Position(commitPosition, preparePosition); + } + + private static List Map(IEnumerable resolvedEvents) + { + return resolvedEvents + .Select(e => new EventStoreEvent + { + AggregateSequenceNumber = (int)(e.Event.EventNumber.ToInt64() + 1), // Starts from zero + Metadata = Encoding.UTF8.GetString(e.Event.Metadata.ToArray()), + AggregateId = e.Event.EventStreamId, + Data = Encoding.UTF8.GetString(e.Event.Data.ToArray()), + }) + .ToList(); } -} \ No newline at end of file +} diff --git a/Source/EventFlow.EventStores.EventStore/Extensions/EventFlowOptionsExtensions.cs b/Source/EventFlow.EventStores.EventStore/Extensions/EventFlowOptionsExtensions.cs index c889fabd1..b189e753a 100644 --- a/Source/EventFlow.EventStores.EventStore/Extensions/EventFlowOptionsExtensions.cs +++ b/Source/EventFlow.EventStores.EventStore/Extensions/EventFlowOptionsExtensions.cs @@ -20,79 +20,15 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -using EventFlow.Configuration; -using EventFlow.Core; using EventFlow.Extensions; -using EventStore.ClientAPI; -using System; -using System.Net; -namespace EventFlow.EventStores.EventStore.Extensions +namespace EventFlow.EventStores.EventStore.Extensions; + +public static class EventFlowOptionsExtensions { - public static class EventFlowOptionsExtensions + public static IEventFlowOptions UseEventStoreEventStore(this IEventFlowOptions eventFlowOptions) { - public static IEventFlowOptions UseEventStoreEventStore( - this IEventFlowOptions eventFlowOptions) - { - return eventFlowOptions.UseEventStore(); - } - - [Obsolete("Use the overloads with 'uri' parameter instead.")] - public static IEventFlowOptions UseEventStoreEventStore( - this IEventFlowOptions eventFlowOptions, - IPEndPoint ipEndPoint) - { - return eventFlowOptions - .UseEventStoreEventStore(ipEndPoint, ConnectionSettings.Default); - } - - [Obsolete("Use the overloads with 'uri' parameter instead.")] - public static IEventFlowOptions UseEventStoreEventStore( - this IEventFlowOptions eventFlowOptions, - IPEndPoint ipEndPoint, - ConnectionSettings connectionSettings) - { - var eventStoreConnection = EventStoreConnection.Create( - connectionSettings, - ipEndPoint, - $"EventFlow v{typeof(EventFlowOptionsExtensions).Assembly.GetName().Version}"); - - using (var a = AsyncHelper.Wait) - { - a.Run(eventStoreConnection.ConnectAsync()); - } - - return eventFlowOptions - .RegisterServices(f => f.Register(r => eventStoreConnection, Lifetime.Singleton)) - .UseEventStore(); - } - - public static IEventFlowOptions UseEventStoreEventStore( - this IEventFlowOptions eventFlowOptions, - Uri uri, - ConnectionSettings connectionSettings, - string connectionNamePrefix = null) - { - var sanitizedConnectionNamePrefix = string.IsNullOrEmpty(connectionNamePrefix) - ? string.Empty - : connectionNamePrefix + " - "; - - var eventStoreConnection = EventStoreConnection.Create( - connectionSettings, - uri, - $"{sanitizedConnectionNamePrefix}EventFlow v{typeof(EventFlowOptionsExtensions).Assembly.GetName().Version}"); - -#pragma warning disable 618 - // TODO: Figure out bootstrapping alternative for 1.0 - using (var a = AsyncHelper.Wait) - { - a.Run(eventStoreConnection.ConnectAsync()); - } -#pragma warning restore 618 - - return eventFlowOptions - .RegisterServices(f => f.Register(r => eventStoreConnection, Lifetime.Singleton)) - .UseEventStore(); - } + return eventFlowOptions + .UseEventPersistence(); } } \ No newline at end of file