Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: FoldStreamAsync #89

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions src/EventStore.Client.Streams/EventStoreClient.Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,79 @@ public ReadStreamResult ReadStreamAsync(
return ReadStreamAsync(direction, streamName, revision, maxCount, operationOptions, resolveLinkTos, userCredentials, cancellationToken);
}

/// <summary>
/// folds a stream using provided aggregator and seed.
/// </summary>
/// <typeparam name="T">The type of the folded State.</typeparam>
/// <typeparam name="E">The type of deserialized events.</typeparam>
/// <param name="deserialize">A deserialization function returning zero, one, or multiple events for a the given <see cref="ResolvedEvent"/>.</param>
/// <param name="aggregator">An aggregation function returning a new state from last state and current deserialized event.</param>
/// <param name="streamName">The name of the stream to fold.</param>
/// <param name="revision">The <see cref="Position"/> of the first event to fold.</param>
/// <param name="seed">The seed state to start the aggregation.</param>
/// <param name="configureOperationOptions">An <see cref="Action{EventStoreClientOperationOptions}"/> to configure the operation's options.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="userCredentials">The optional <see cref="UserCredentials"/> to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns>A <see cref="FoldResult{T}"/> containing the aggregation result and the <see cref="StreamRevision"/> of the last event aggregated.</returns>
public async ValueTask<FoldResult<T>> FoldStreamAsync<T,E>(
Func<ResolvedEvent, IEnumerable<E>> deserialize,
Func<T,E,T> aggregator,
string streamName,
StreamPosition revision,
T seed,
Action<EventStoreClientOperationOptions>? configureOperationOptions = null,
bool resolveLinkTos = false,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var operationOptions = Settings.OperationOptions.Clone();
configureOperationOptions?.Invoke(operationOptions);

var readReq =
new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
Stream = ReadReq.Types.Options.Types.StreamOptions.FromStreamNameAndRevision(streamName, revision),
Count = long.MaxValue
}
};
if (readReq.Options.Filter == null) {
readReq.Options.NoFilter = new Empty();
}

readReq.Options.UuidOption = new ReadReq.Types.Options.Types.UUIDOption { Structured = new Empty() };

var call =
_client.Read(readReq,
EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cancellationToken))
.ResponseStream.ReadAllAsync().GetAsyncEnumerator();


var hasNext = await call.MoveNextAsync(cancellationToken).ConfigureAwait(false);

var rev =
hasNext && call.Current.ContentCase == ReadResp.ContentOneofCase.StreamNotFound
? StreamRevision.None
: hasNext
? StreamRevision.FromStreamPosition(revision)
: StreamRevision.FromStreamPosition(revision-1);

while(hasNext) {
if (call.Current.ContentCase == ReadResp.ContentOneofCase.Event) {
var re = ConvertToResolvedEvent(call.Current.Event);
if (re.Event != null) {
rev = StreamRevision.FromStreamPosition(re.Event.EventNumber);
foreach (var e in deserialize(re))
seed = aggregator(seed, e);
}
}
hasNext = await call.MoveNextAsync(cancellationToken).ConfigureAwait(false);
}

return new FoldResult<T>(rev, seed);
}

/// <summary>
/// A class that represents the result of a read operation.
/// </summary>
Expand Down
27 changes: 27 additions & 0 deletions src/EventStore.Client.Streams/FoldResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#nullable enable
namespace EventStore.Client {
/// <summary>
/// Represents the result of a <see cref="EventStoreClient.FoldStreamAsync{T, E}(System.Func{ResolvedEvent, System.Collections.Generic.IEnumerable{E}}, System.Func{T, E, T}, string, StreamPosition, T, System.Action{EventStoreClientOperationOptions}?, bool, UserCredentials?, System.Threading.CancellationToken)" /> call.
/// </summary>
/// <typeparam name="T">The type of the aggregated state.</typeparam>
public struct FoldResult<T> {
/// <summary>
/// The position of the last event aggregated.
/// </summary>
public StreamRevision Revision { get; }
/// <summary>
/// The aggregation result.
/// </summary>
public T Value { get; }

/// <summary>
/// Build an instance of <see cref="FoldResult{T}"/>.
/// </summary>
/// <param name="revision">The last event aggregated.</param>
/// <param name="value">The aggregation result.</param>
public FoldResult(StreamRevision revision, T value) {
Revision = revision;
Value = value;
}
}
}
212 changes: 212 additions & 0 deletions test/EventStore.Client.Streams.Tests/fold_stream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Xunit;
using System.Collections.Generic;

namespace EventStore.Client {
[Trait("Category", "Network")]
public class fold_stream : IClassFixture<fold_stream.Fixture> {
private readonly Fixture _fixture;

public fold_stream(Fixture fixture) {
_fixture = fixture;
}


[Fact]
public async Task fold_is_called_for_each_event() {
var streamName = _fixture.GetStreamName();
const int count = 20;

await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream,
_fixture.CreateTestEvents(count));

var result = await _fixture.Client.FoldStreamAsync(
e => new[] { e.Event.EventNumber },
(acc, e) => { acc.Add(e); return acc; },
streamName,
StreamPosition.Start,
new List<StreamPosition>());

var expected =
Enumerable.Range(0, count)
.Select(p => new StreamPosition((ulong)p));

Assert.Equal(expected, result.Value);
}

[Fact]
public async Task fold_result_revision_is_last_event() {
var streamName = _fixture.GetStreamName();
const int count = 20;

await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream,
_fixture.CreateTestEvents(count));

var result = await _fixture.Client.FoldStreamAsync(
e => new[] { e.Event.EventNumber },
(acc, e) => acc + 1,
streamName,
StreamPosition.Start,
0);



Assert.Equal(StreamRevision.FromInt64 (count-1), result.Revision);
}


[Fact]
public async Task fold_result_revision_is_none_when_stream_doesnt_exist() {
var streamName = _fixture.GetStreamName();

var result = await _fixture.Client.FoldStreamAsync(
e => new[] { e.Event.EventNumber },
(acc, e) => acc + 1,
streamName,
StreamPosition.Start,
0);

Assert.Equal(StreamRevision.None, result.Revision);
}

[Fact]
public async Task fold_result_value_is_seed_when_stream_doesnt_exist() {
var streamName = _fixture.GetStreamName();

var seed = new object();
var result = await _fixture.Client.FoldStreamAsync(
e => new[] { e.Event.EventNumber },
(acc, e) => new object(),
streamName,
StreamPosition.Start,
seed);

Assert.Same(seed, result.Value);
}


[Fact]
public async Task fold_result_is_seed_if_all_events_are_deserialized_to_empty() {
var streamName = _fixture.GetStreamName();
const int count = 20;

await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream,
_fixture.CreateTestEvents(count));

var seed = new object();
var result = await _fixture.Client.FoldStreamAsync(
e => Array.Empty<int>(),
(acc, e) => new object(),
streamName,
StreamPosition.Start,
seed);



Assert.Same(seed, result.Value);
}


[Fact]
public async Task fold_starting_after_last_event_has_position() {
var streamName = _fixture.GetStreamName();
const int pos = 20;
const int count = 20;

await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream,
_fixture.CreateTestEvents(count));

var result = await _fixture.Client.FoldStreamAsync(
e => new[] { e.Event.EventNumber },
(acc, e) => { acc.Add(e); return acc; },
streamName,
StreamPosition.FromInt64(pos),
new List<StreamPosition>());


var expected = StreamRevision.FromInt64(pos-1);
Assert.Equal(expected, result.Revision);
}


[Fact]
public async Task fold_starting_from_position_contains_expected_events () {
var streamName = _fixture.GetStreamName();
const int pos = 10;
const int count = 20;

await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream,
_fixture.CreateTestEvents(count));

var result = await _fixture.Client.FoldStreamAsync(
e => new[] { e.Event.EventNumber },
(acc, e) => { acc.Add(e); return acc; },
streamName,
StreamPosition.FromInt64(pos),
new List<StreamPosition>());


var expected =
Enumerable.Range(pos, count-pos)
.Select(p => new StreamPosition((ulong)p));

Assert.Equal(expected, result.Value);
}

[Fact]
public async Task fold_starting_from_start_contains_expected_events() {
var streamName = _fixture.GetStreamName();
const int count = 20;

await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream,
_fixture.CreateTestEvents(count));

var result = await _fixture.Client.FoldStreamAsync(
e => new[] { e },
(acc, e) => { acc.Add(e.Event.EventNumber); return acc; },
streamName,
StreamPosition.Start,
new List<StreamPosition>());


var expected =
Enumerable.Range(0, count)
.Select(p => new StreamPosition((ulong)p));

Assert.Equal(expected, result.Value);
}


[Fact]
public async Task fold_with_deserializer_returning_multiple_events_process_them_in_order() {
var streamName = _fixture.GetStreamName();
const int count = 20;

await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream,
_fixture.CreateTestEvents(count));

var result = await _fixture.Client.FoldStreamAsync(
e => new[] { (e.Event.EventNumber, 0 ), ( e.Event.EventNumber, 1 ) },
(acc, e) => { acc.Add(e); return acc; },
streamName,
StreamPosition.Start,
new List<(StreamPosition,int)>());


var expected =
from p in Enumerable.Range(0, count)
from id in Enumerable.Range(0,2)
select (new StreamPosition((ulong)p), id);

Assert.Equal(expected, result.Value);
}


public class Fixture : EventStoreClientFixture {
protected override Task Given() => Task.CompletedTask;
protected override Task When() => Task.CompletedTask;
}
}
}