diff --git a/src/EventStore.Client.Streams/EventStoreClient.Read.cs b/src/EventStore.Client.Streams/EventStoreClient.Read.cs index e4933a327..2bb3f8ab4 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Read.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Read.cs @@ -113,6 +113,79 @@ public ReadStreamResult ReadStreamAsync( return ReadStreamAsync(direction, streamName, revision, maxCount, operationOptions, resolveLinkTos, userCredentials, cancellationToken); } + /// + /// folds a stream using provided aggregator and seed. + /// + /// The type of the folded State. + /// The type of deserialized events. + /// A deserialization function returning zero, one, or multiple events for a the given . + /// An aggregation function returning a new state from last state and current deserialized event. + /// The name of the stream to fold. + /// The of the first event to fold. + /// The seed state to start the aggregation. + /// An to configure the operation's options. + /// Whether to resolve LinkTo events automatically. + /// The optional to perform operation with. + /// The optional . + /// A containing the aggregation result and the of the last event aggregated. + public async ValueTask> FoldStreamAsync( + Func> deserialize, + Func aggregator, + string streamName, + StreamPosition revision, + T seed, + Action? configureOperationOptions = null, + bool resolveLinkTos = false, + UserCredentials? userCredentials = null, + CancellationToken cancellationToken = default) { + var operationOptions = Settings.OperationOptions.Clone(); + configureOperationOptions?.Invoke(operationOptions); + + var readReq = + new ReadReq { + Options = new ReadReq.Types.Options { + ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, + ResolveLinks = resolveLinkTos, + Stream = ReadReq.Types.Options.Types.StreamOptions.FromStreamNameAndRevision(streamName, revision), + Count = long.MaxValue + } + }; + if (readReq.Options.Filter == null) { + readReq.Options.NoFilter = new Empty(); + } + + readReq.Options.UuidOption = new ReadReq.Types.Options.Types.UUIDOption { Structured = new Empty() }; + + var call = + _client.Read(readReq, + EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cancellationToken)) + .ResponseStream.ReadAllAsync().GetAsyncEnumerator(); + + + var hasNext = await call.MoveNextAsync(cancellationToken).ConfigureAwait(false); + + var rev = + hasNext && call.Current.ContentCase == ReadResp.ContentOneofCase.StreamNotFound + ? StreamRevision.None + : hasNext + ? StreamRevision.FromStreamPosition(revision) + : StreamRevision.FromStreamPosition(revision-1); + + while(hasNext) { + if (call.Current.ContentCase == ReadResp.ContentOneofCase.Event) { + var re = ConvertToResolvedEvent(call.Current.Event); + if (re.Event != null) { + rev = StreamRevision.FromStreamPosition(re.Event.EventNumber); + foreach (var e in deserialize(re)) + seed = aggregator(seed, e); + } + } + hasNext = await call.MoveNextAsync(cancellationToken).ConfigureAwait(false); + } + + return new FoldResult(rev, seed); + } + /// /// A class that represents the result of a read operation. /// diff --git a/src/EventStore.Client.Streams/FoldResult.cs b/src/EventStore.Client.Streams/FoldResult.cs new file mode 100644 index 000000000..e60e3b885 --- /dev/null +++ b/src/EventStore.Client.Streams/FoldResult.cs @@ -0,0 +1,27 @@ +#nullable enable +namespace EventStore.Client { + /// + /// Represents the result of a call. + /// + /// The type of the aggregated state. + public struct FoldResult { + /// + /// The position of the last event aggregated. + /// + public StreamRevision Revision { get; } + /// + /// The aggregation result. + /// + public T Value { get; } + + /// + /// Build an instance of . + /// + /// The last event aggregated. + /// The aggregation result. + public FoldResult(StreamRevision revision, T value) { + Revision = revision; + Value = value; + } + } +} diff --git a/test/EventStore.Client.Streams.Tests/fold_stream.cs b/test/EventStore.Client.Streams.Tests/fold_stream.cs new file mode 100644 index 000000000..349f31039 --- /dev/null +++ b/test/EventStore.Client.Streams.Tests/fold_stream.cs @@ -0,0 +1,212 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Xunit; +using System.Collections.Generic; + +namespace EventStore.Client { + [Trait("Category", "Network")] + public class fold_stream : IClassFixture { + private readonly Fixture _fixture; + + public fold_stream(Fixture fixture) { + _fixture = fixture; + } + + + [Fact] + public async Task fold_is_called_for_each_event() { + var streamName = _fixture.GetStreamName(); + const int count = 20; + + await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream, + _fixture.CreateTestEvents(count)); + + var result = await _fixture.Client.FoldStreamAsync( + e => new[] { e.Event.EventNumber }, + (acc, e) => { acc.Add(e); return acc; }, + streamName, + StreamPosition.Start, + new List()); + + var expected = + Enumerable.Range(0, count) + .Select(p => new StreamPosition((ulong)p)); + + Assert.Equal(expected, result.Value); + } + + [Fact] + public async Task fold_result_revision_is_last_event() { + var streamName = _fixture.GetStreamName(); + const int count = 20; + + await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream, + _fixture.CreateTestEvents(count)); + + var result = await _fixture.Client.FoldStreamAsync( + e => new[] { e.Event.EventNumber }, + (acc, e) => acc + 1, + streamName, + StreamPosition.Start, + 0); + + + + Assert.Equal(StreamRevision.FromInt64 (count-1), result.Revision); + } + + + [Fact] + public async Task fold_result_revision_is_none_when_stream_doesnt_exist() { + var streamName = _fixture.GetStreamName(); + + var result = await _fixture.Client.FoldStreamAsync( + e => new[] { e.Event.EventNumber }, + (acc, e) => acc + 1, + streamName, + StreamPosition.Start, + 0); + + Assert.Equal(StreamRevision.None, result.Revision); + } + + [Fact] + public async Task fold_result_value_is_seed_when_stream_doesnt_exist() { + var streamName = _fixture.GetStreamName(); + + var seed = new object(); + var result = await _fixture.Client.FoldStreamAsync( + e => new[] { e.Event.EventNumber }, + (acc, e) => new object(), + streamName, + StreamPosition.Start, + seed); + + Assert.Same(seed, result.Value); + } + + + [Fact] + public async Task fold_result_is_seed_if_all_events_are_deserialized_to_empty() { + var streamName = _fixture.GetStreamName(); + const int count = 20; + + await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream, + _fixture.CreateTestEvents(count)); + + var seed = new object(); + var result = await _fixture.Client.FoldStreamAsync( + e => Array.Empty(), + (acc, e) => new object(), + streamName, + StreamPosition.Start, + seed); + + + + Assert.Same(seed, result.Value); + } + + + [Fact] + public async Task fold_starting_after_last_event_has_position() { + var streamName = _fixture.GetStreamName(); + const int pos = 20; + const int count = 20; + + await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream, + _fixture.CreateTestEvents(count)); + + var result = await _fixture.Client.FoldStreamAsync( + e => new[] { e.Event.EventNumber }, + (acc, e) => { acc.Add(e); return acc; }, + streamName, + StreamPosition.FromInt64(pos), + new List()); + + + var expected = StreamRevision.FromInt64(pos-1); + Assert.Equal(expected, result.Revision); + } + + + [Fact] + public async Task fold_starting_from_position_contains_expected_events () { + var streamName = _fixture.GetStreamName(); + const int pos = 10; + const int count = 20; + + await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream, + _fixture.CreateTestEvents(count)); + + var result = await _fixture.Client.FoldStreamAsync( + e => new[] { e.Event.EventNumber }, + (acc, e) => { acc.Add(e); return acc; }, + streamName, + StreamPosition.FromInt64(pos), + new List()); + + + var expected = + Enumerable.Range(pos, count-pos) + .Select(p => new StreamPosition((ulong)p)); + + Assert.Equal(expected, result.Value); + } + + [Fact] + public async Task fold_starting_from_start_contains_expected_events() { + var streamName = _fixture.GetStreamName(); + const int count = 20; + + await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream, + _fixture.CreateTestEvents(count)); + + var result = await _fixture.Client.FoldStreamAsync( + e => new[] { e }, + (acc, e) => { acc.Add(e.Event.EventNumber); return acc; }, + streamName, + StreamPosition.Start, + new List()); + + + var expected = + Enumerable.Range(0, count) + .Select(p => new StreamPosition((ulong)p)); + + Assert.Equal(expected, result.Value); + } + + + [Fact] + public async Task fold_with_deserializer_returning_multiple_events_process_them_in_order() { + var streamName = _fixture.GetStreamName(); + const int count = 20; + + await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream, + _fixture.CreateTestEvents(count)); + + var result = await _fixture.Client.FoldStreamAsync( + e => new[] { (e.Event.EventNumber, 0 ), ( e.Event.EventNumber, 1 ) }, + (acc, e) => { acc.Add(e); return acc; }, + streamName, + StreamPosition.Start, + new List<(StreamPosition,int)>()); + + + var expected = + from p in Enumerable.Range(0, count) + from id in Enumerable.Range(0,2) + select (new StreamPosition((ulong)p), id); + + Assert.Equal(expected, result.Value); + } + + + public class Fixture : EventStoreClientFixture { + protected override Task Given() => Task.CompletedTask; + protected override Task When() => Task.CompletedTask; + } + } +}