-
-
Notifications
You must be signed in to change notification settings - Fork 462
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reboot Projection API Model #3052
Comments
I'm shutting this down for now. The "grouping/slicing" stuff probably gets re-evaluated as part of Marten 8. |
Nope, client work today made me go into the guts of the projection code, and now's a decent time to re-think this for Marten 8 |
Current Thinking (September 21st, 2024)
More details to come. |
Notes on Nov. 10thCurrent state of public interface IProjectionSource: IReadOnlyProjectionData
{
AsyncOptions Options { get; }
/// <summary>
/// This is *only* a hint to Marten about what projected document types
/// are published by this projection to aid the "generate ahead" model
/// </summary>
/// <returns></returns>
IEnumerable<Type> PublishedTypes();
IReadOnlyList<AsyncProjectionShard> AsyncProjectionShards(DocumentStore store);
ValueTask<EventRangeGroup> GroupEvents(DocumentStore store, IMartenDatabase daemonDatabase,
EventRange range,
CancellationToken cancellationToken);
IProjection Build(DocumentStore store);
ISubscriptionExecution BuildExecution(AsyncProjectionShard shard, DocumentStore store, IMartenDatabase database,
ILogger logger);
/// <summary>
/// Specify that this projection is a non 1 version of the original projection definition to opt
/// into Marten's parallel blue/green deployment of this projection.
/// </summary>
public uint ProjectionVersion { get; set; }
bool TryBuildReplayExecutor(DocumentStore store, IMartenDatabase database, out IReplayExecutor executor);
} Notes:
public interface IProjection
{
ValueTask ApplyAsync(IDocumentOperations, IReadOnlyList<IEvent>, CancellationToken);
}
SnapshotLifecycleI hate the User EntrypointsSo what users will actually use to create projections to Marten / Ermine / future critters?
EventProjection
"Self Aggregates"This is the old "self aggregating" approach where you throw Aggregation
Through validation, make every single one of these options be mutually exclusive. You gotta do one or the other, but not mix Aggregation<TDoc, TId>Same rules and stuff as Options:
ValueTask<IReadOnlyList<EventSliceGroup<TDoc, TId>>> SliceAsyncEvents(
IQuerySession querySession,
List<IEvent> events); That last signature gives you the ability to do anything. Event data enrichment, use other data for slicing, you name it. Special logic, whatever. And again, all of the above options are mutually exclusive. In all cases, I'd like to push users toward explicit code whenever the logic or workflow or data lookup needs are anything other than simplistic |
I would suggest that if you want explicit projections to become the default, then the most common cases should be very easy. So maybe have a plain and simple public interface IProjection<T>
{
ValueTask<T> ApplyAsync(IDocumentOperations ops, IReadOnlyList<IEvent> events, CancellationToken cancellationToken);
}
public class Projection<T>
{
public virtual ValueTask<T> ApplyAsync(IDocumentOperations ops, IReadOnlyList<IEvent> events, CancellationToken cancellationToken)
{
// default implementation that you could override
// first call create on all events until aggregate is not null
// then call Apply on all events
}
public abstract ValueTask<T> Create(IDocumentOperations ops, IEvent @event);
public abstract ValueTask<T> Apply(IDocumentOperations ops, T aggregate, IEvent @event);
} Why split Create and Apply? Because of nullability. We and the compiler can now assume that in the Apply method the aggregate is never null. |
Proposed API -- Edited Dec. 3rdEntry PointsBy "entry points", I mean the interfaces or base classes that users would implement or override in order to implement a projection of some sort. Directly Implement ILiveAggregatorThis is already in the guts of Marten, but for simple aggregations where you never delete anything, just implement this: public interface ILiveAggregator<T>
{
ValueTask<T> BuildAsync(
IReadOnlyList<IEvent> events,
IQuerySession session,
T? snapshot,
CancellationToken cancellation);
} The general idea is that Marten (really JasperFx) would have adapters that let this run as either an inline projection, an async projection, or as is for live aggregations Directly Implement IProjectionIn this generation, public interface IProjection
{
Task ApplyAsync(IDocumentOperations operations, IReadOnlyList<IEvent> events, CancellationToken cancellation);
}
Subclassing
|
Slicing APIs -- Dec 3rdI originally thought that Marten 8 wouldn't have very much change in the public API, but the main goal is to get common stuff pulled out to use in Ermine or any other future event store critters, so maybe there's room to consider actual disruptive changes. And I will freely admit that I think the event slicing API is about the absolute least usable API in the entire Critter Stack and I've always been unhappy with it outside of easy multi-stream aggregations. I'm also aware that widespread API changes will lead to the need for a comprehensive migration guide that might be time consuming -- but also, the documentation on multi-stream projections isn't terribly good today anyway, so ¯_(ツ)_/¯. Enough, let's get started, here's a static view of the base data holders for slicing in Marten 8 so far classDiagram
class EventSliceGroup {
string TenantId
}
EventSliceGroup ..|> IEventGrouping
EventSliceGroup *.. EventSlice
EventSlice o.. IEvent
Just know that most of the types have generic arguments for
public interface IEventGrouping<TId>
{
void AddEvent(TId id, IEvent @event);
void AddEvents(TId id, IEnumerable<IEvent> events);
// This time it would be smart enough to wall paper over a T of either
// InvoiceAdded or IEvent<InvoiceAdded>
void AddEvents<TEvent>(Func<TEvent, TId> singleIdSource, IEnumerable<IEvent> events);
// This time it would be smart enough to wall paper over a T of either
// InvoiceAdded or IEvent<InvoiceAdded>
void AddEvents<TEvent>(Func<TEvent, IEnumerable<TId>> multipleIdSource, IEnumerable<IEvent> events);
void FanOutOnEach<TSource, TChild>(Func<TSource, IEnumerable<TChild>> fanOutFunc);
} By making it smart enough to understand the different mechanics from IEventSlicerThe JasperFx interface to express slicing will be this: public interface IEventSlicer<TDoc, TId>
{
// Making a big assumption here that the multi-tenancy sorting -- if any -- happens
// independently of this slicing
ValueTask SliceAsync(IEventGrouping<TId> grouping, IReadOnlyList<IEvent> events);
} The general idea is that you'll get a group of events, then place them -- or derived, virtual events -- in the grouping by the right id. The Marten specific version that plugs into that would be: public interface IMartenEventSlicer<TDoc, TId>
{
// Making a big assumption here that the multi-tenancy sorting -- if any -- happens
// independently of this slicing
ValueTask SliceAsync(IQuerySession session, IEventGrouping<TId> grouping, IReadOnlyList<IEvent> events);
} Creating slicing rulesI guess that both MultiStreamProjection and maybe FlatTableProjection will have the same behavior here. The first option for defining slicing behavior would be to keep the existing methods for simplistic slicing: *.
The second option would be to override this new method: public virtual TId IdentityFor(IEvent e) The third option would be to override this new method for really custom slicing where you may also want to do event enrichment in a batch: public virtual ValueTask SliceAsync(IQuerySession, IEventGrouping<TId>, IReadOnlyList<IEvent>); And a fourth, rarer model to just call:
that works as it does today. I would vote to delete |
Nevermind, we can keep this so we don't break backward compatibilityEliminate the support for |
For the Naming Suggestions
Documentation Suggestions
/// <summary>
/// A read-only list of events that are part of the current slice being processed by the projection.
/// </summary>
/// <param name="events">The events included in the current processing context, possibly pre-filtered or grouped.</param> Of course, I will help you write explicit documentation on this (even if you do not implement any of the above suggestions) |
Just to try the ILiveAggregator. I think for this ShoppingCart example it makes for a simplified design: public class ShoppingCart
{
public Guid Id { get; set; }
public List<string> Items { get; set; } = new();
public bool IsCheckedOut { get; set; } = false;
}
public class ShoppingCartAggregator : ILiveAggregator<ShoppingCart>
{
public ShoppingCart Build(ShoppingCart? current, IReadOnlyList<IEvent> events)
{
// Initialize the aggregate if it doesn't exist
var cart = current ?? new ShoppingCart();
foreach (var @event in events)
{
switch (@event.Data)
{
case ItemAdded itemAdded:
cart.Items.Add(itemAdded.ItemName);
break;
case ItemRemoved itemRemoved:
cart.Items.Remove(itemRemoved.ItemName);
break;
case CartCheckedOut _:
cart.IsCheckedOut = true;
break;
default:
throw new InvalidOperationException($"Unknown event type: {@event.GetType()}");
}
}
return cart;
}
}
public record ItemAdded(string ItemName);
public record ItemRemoved(string ItemName);
public record CartCheckedOut(); You have a lot of power over creation proces and maybe some interaction between events |
Proposed
|
One way to deal with the null checks is of course: public class ShoppingCartAggregator : ILiveAggregator<ShoppingCart>
{
public ShoppingCart Build(ShoppingCart? current, IReadOnlyList<IEvent> events)
{
ShoppingCart cart = current ?? events.OfType<IEvent<CartCreated>>().FirstOrDefault() switch
{
IEvent<CartCreated> created => ShoppingCart.Create(created.Data.Id),
_ => throw new InvalidOperationException("Cart must be created before other events can be applied.")
};
foreach (var @event in events)
{
cart = @event.Data switch
{
ItemAdded itemAdded => cart.Apply(itemAdded),
ItemRemoved itemRemoved => cart.Apply(itemRemoved),
CartCheckedOut checkedOut => cart.Apply(checkedOut),
_ => throw new InvalidOperationException($"Unknown event type: {@event.GetType()}")
};
}
return cart;
}
} But there is the performance penalty for the initial event filtering on OfType. Might not be a biggy, since it should be event zero. So with some documentation on that, this might work. |
Suggested Improvement: Helper Methods for Null Handling and InitializationTo reduce boilerplate and improve readability in
These helper methods would simplify event processing logic, making implementations more concise and less error-prone. Proposed Helper Methodspublic static class AggregatorHelpers
{
/// <summary>
/// Ensures that the aggregate is not null. Throws an exception if it is null.
/// </summary>
public static T EnsureExists<T>(T? aggregate, string errorMessage) where T : class
{
return aggregate ?? throw new InvalidOperationException(errorMessage);
}
/// <summary>
/// Initializes the aggregate from an event, or throws if the event is not found.
/// </summary>
public static T InitializeFromEvent<T, TEvent>(T? current, IReadOnlyList<IEvent> events, Func<IEvent<TEvent>, T> create)
where TEvent : class
{
if (current is not null)
return current;
var createEvent = events.OfType<IEvent<TEvent>>().FirstOrDefault();
return createEvent != null
? create(createEvent)
: throw new InvalidOperationException($"Missing initialization event of type {typeof(TEvent).Name}");
}
} Example: Refactored
|
@erdtsieck The conventional approaches to the aggregation already make sure that the snapshot exists before calling into the Apply methods. Helpers for writing purely explicit projections sounds good to me though. |
Medium to long term things here folks, so nobody get too upset or anxious about anything here yet. I'm just thinking out loud here.
Problems:
Create()
/Apply()
is successful for simple workflows, but breaks down badly for any kind of remotely complicated workflowCustomProjection
is a bit clumsy to use today. Doesn't work for live aggregations. Would help if this was an easier escape hatch from the conventional/codegen approachPossible Solutions
CustomProjection
so that it can be used for live aggregationsStore/Delete/UnDelete/Archive/Partials
telling Marten what to do next. Vague hope here is to do something that's easy to testIndependently, I'd like to move this one up into a release soon: #2533
The text was updated successfully, but these errors were encountered: