Skip to content

Commit 1d63ca2

Browse files
committed
First class migration support for switching a projection from Inline to Async. Closes GH-3564
1 parent a0ee4c7 commit 1d63ca2

File tree

5 files changed

+252
-1
lines changed

5 files changed

+252
-1
lines changed

docs/configuration/cli.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ And finally, use Oakton as the command line parser and executor by replacing `Ap
3333
// as the last line of your Program.cs file
3434
return await app.RunOaktonCommands(args);
3535
```
36-
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/samples/MinimalAPI/Program.cs#L51-L57' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_webapplication_2' title='Start of snippet'>anchor</a></sup>
36+
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/samples/MinimalAPI/Program.cs#L55-L61' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_webapplication_2' title='Start of snippet'>anchor</a></sup>
3737
<!-- endSnippet -->
3838

3939
Once the _Marten.CommandLine_ Nuget is installed and Oakton is handling your command line parsing, you should be able to see the Marten commands by typing `dotnet run -- help` in the command line terminal of your choice at the root of your project:

docs/events/projections/async-daemon.md

+37
Original file line numberDiff line numberDiff line change
@@ -528,3 +528,40 @@ var latest = await session.QueryForNonStaleData<Trip>(5.Seconds())
528528

529529
Do note that this can time out if the projection just can't catch up to the latest event sequence in time. You may need to
530530
be both cautious with using this in general, and also cautious especially with the timeout setting.
531+
532+
## Migrating a Projection from Inline to Async <Badge type="tip" text="7.35" />
533+
534+
::: warning
535+
This will only work correctly *if* you have system downtime before migrating the new version of the code with this option
536+
enabled. This feature cannot support a "blue/green" deployment model. Marten needs to system to be at rest before it starts
537+
up the projection asynchronously or there's a chance you may "skip" events in the projection.
538+
:::
539+
540+
During the course of a system's lifetime, you may find that you want to change a projection that's currently running
541+
with a lifecycle of `Inline` to running asynchronously instead. If you need to do this *and* there is no structural change
542+
to the projection that would require a projection rebuild, you can direct Marten to start that projection at the highest
543+
sequence number assigned by the system (not the high water mark, but the event sequence number which may be higher).
544+
545+
To do so, use this option when registering the projection:
546+
547+
<!-- snippet: sample_using_subscribe_as_inline_to_async -->
548+
<a id='snippet-sample_using_subscribe_as_inline_to_async'></a>
549+
```cs
550+
opts
551+
.Projections
552+
.Snapshot<SimpleAggregate>(SnapshotLifecycle.Async, o =>
553+
{
554+
// This option tells Marten to start the async projection at the highest
555+
// event sequence assigned as the processing floor if there is no previous
556+
// async daemon progress for this projection
557+
o.SubscribeAsInlineToAsync();
558+
});
559+
```
560+
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/converting_projection_from_inline_to_async.cs#L31-L43' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_subscribe_as_inline_to_async' title='Start of snippet'>anchor</a></sup>
561+
<!-- endSnippet -->
562+
563+
Just to be clear, when Marten's async daemon starts a projection with this starting option:
564+
565+
1. If there is no previously recorded progression, Marten will start processing this projection with the highest assigned event sequence
566+
in the database as the floor and record that value as the current progress
567+
2. If there is a previously recorded progression, Marten will start processing this projection at the recorded sequence as normal

src/DaemonTests/AsyncOptionsTests.cs

+24
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,30 @@ public async Task subscribe_from_sequence_hit_with_prior_higher_than_threshold_a
239239
.ShouldBe(new Position(222L, true));
240240
}
241241

242+
[Fact]
243+
public async Task transition_from_inline_to_async_no_initial_progress()
244+
{
245+
theDatabase.ProjectionProgressFor(theName, theToken).Returns(0);
246+
theDatabase.FetchHighestEventSequenceNumber().Returns(1234L);
247+
var options = new AsyncOptions();
248+
options.SubscribeAsInlineToAsync();
249+
250+
(await options.DetermineStartingPositionAsync(1000L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
251+
.ShouldBe(new Position(1234L, true));
252+
}
253+
254+
[Fact]
255+
public async Task transition_from_inline_to_async_but_there_is_initial_progress()
256+
{
257+
theDatabase.ProjectionProgressFor(theName, theToken).Returns(1000L);
258+
theDatabase.FetchHighestEventSequenceNumber().Returns(2005L);
259+
var options = new AsyncOptions();
260+
options.SubscribeAsInlineToAsync();
261+
262+
(await options.DetermineStartingPositionAsync(2003L, theName, ShardExecutionMode.Continuous, theDatabase, theToken))
263+
.ShouldBe(new Position(1000L, false));
264+
}
265+
242266

243267

244268

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using DaemonTests.TestingSupport;
4+
using JasperFx.Core;
5+
using Marten.Events.Projections;
6+
using Marten.Metadata;
7+
using Marten.Testing.Harness;
8+
using Shouldly;
9+
using Xunit;
10+
using Xunit.Abstractions;
11+
12+
namespace DaemonTests;
13+
14+
public class converting_projection_from_inline_to_async : OneOffConfigurationsContext
15+
{
16+
[Fact]
17+
public async Task start_as_inline_move_to_async_and_just_continue()
18+
{
19+
StoreOptions(opts =>
20+
{
21+
opts.Projections.Snapshot<SimpleAggregate>(SnapshotLifecycle.Inline);
22+
});
23+
24+
var id1 = theSession.Events.StartStream<SimpleAggregate>(new AEvent(), new BEvent()).Id;
25+
var id2 = theSession.Events.StartStream<SimpleAggregate>(new BEvent(), new CEvent()).Id;
26+
var id3 = theSession.Events.StartStream<SimpleAggregate>(new CEvent(), new DEvent()).Id;
27+
await theSession.SaveChangesAsync();
28+
29+
var store2 = SeparateStore(opts =>
30+
{
31+
#region sample_using_subscribe_as_inline_to_async
32+
33+
opts
34+
.Projections
35+
.Snapshot<SimpleAggregate>(SnapshotLifecycle.Async, o =>
36+
{
37+
// This option tells Marten to start the async projection at the highest
38+
// event sequence assigned as the processing floor if there is no previous
39+
// async daemon progress for this projection
40+
o.SubscribeAsInlineToAsync();
41+
});
42+
43+
#endregion
44+
});
45+
46+
using var daemon = await store2.BuildProjectionDaemonAsync();
47+
await daemon.StartAllAsync();
48+
49+
using var session = store2.LightweightSession();
50+
session.Events.Append(id1, new EEvent(), new EEvent());
51+
session.Events.Append(id2, new EEvent(), new EEvent());
52+
session.Events.Append(id3, new EEvent(), new EEvent());
53+
await session.SaveChangesAsync();
54+
55+
await daemon.WaitForNonStaleData(10.Seconds());
56+
57+
var aggregate1 = await session.LoadAsync<SimpleAggregate>(id1);
58+
var aggregate2 = await session.LoadAsync<SimpleAggregate>(id2);
59+
var aggregate3 = await session.LoadAsync<SimpleAggregate>(id3);
60+
61+
aggregate1.ShouldBe(new SimpleAggregate
62+
{
63+
Id = id1,
64+
Version = 4,
65+
ACount = 1,
66+
BCount = 1,
67+
ECount = 2
68+
});
69+
70+
aggregate2.ShouldBe(new SimpleAggregate
71+
{
72+
Id = id2,
73+
Version = 4,
74+
BCount = 1,
75+
CCount = 1,
76+
ECount = 2
77+
});
78+
79+
}
80+
}
81+
82+
public class SimpleAggregate : IRevisioned
83+
{
84+
// This will be the aggregate version
85+
public int Version { get; set; }
86+
87+
public Guid Id { get; set; }
88+
89+
public int ACount { get; set; }
90+
public int BCount { get; set; }
91+
public int CCount { get; set; }
92+
public int DCount { get; set; }
93+
public int ECount { get; set; }
94+
95+
public void Apply(AEvent _)
96+
{
97+
ACount++;
98+
}
99+
100+
public void Apply(BEvent _)
101+
{
102+
BCount++;
103+
}
104+
105+
public void Apply(CEvent _)
106+
{
107+
CCount++;
108+
}
109+
110+
public void Apply(DEvent _)
111+
{
112+
DCount++;
113+
}
114+
115+
public void Apply(EEvent _)
116+
{
117+
ECount++;
118+
}
119+
120+
public override string ToString()
121+
{
122+
return
123+
$"{nameof(Version)}: {Version}, {nameof(Id)}: {Id}, {nameof(ACount)}: {ACount}, {nameof(BCount)}: {BCount}, {nameof(CCount)}: {CCount}, {nameof(DCount)}: {DCount}, {nameof(ECount)}: {ECount}";
124+
}
125+
126+
protected bool Equals(SimpleAggregate other)
127+
{
128+
return Id.Equals(other.Id) && ACount == other.ACount && BCount == other.BCount && CCount == other.CCount && DCount == other.DCount && ECount == other.ECount;
129+
}
130+
131+
public override bool Equals(object obj)
132+
{
133+
if (obj is null)
134+
{
135+
return false;
136+
}
137+
138+
if (ReferenceEquals(this, obj))
139+
{
140+
return true;
141+
}
142+
143+
if (obj.GetType() != GetType())
144+
{
145+
return false;
146+
}
147+
148+
return Equals((SimpleAggregate)obj);
149+
}
150+
151+
public override int GetHashCode()
152+
{
153+
return HashCode.Combine(Id, ACount, BCount, CCount, DCount, ECount);
154+
}
155+
}

src/Marten/Events/Daemon/AsyncOptions.cs

+35
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,23 @@ public AsyncOptions SubscribeFromSequence(long sequenceFloor, string? databaseId
149149
_strategies.Add(new FromSequence(databaseIdentifier, sequenceFloor));
150150
return this;
151151
}
152+
153+
/// <summary>
154+
/// Use this option to prevent having to rebuild a projection when you are
155+
/// simply changing the projection lifecycle from "Inline" to "Async" but are
156+
/// making no other changes that would force a rebuild
157+
///
158+
/// Direct that this projection had previously been running with an "Inline"
159+
/// lifecycle now run as "Async". This will cause Marten to first check if there
160+
/// is any previous async progress, and if not, start the projection from the highest
161+
/// event sequence for the system.
162+
/// </summary>
163+
/// <returns></returns>
164+
public AsyncOptions SubscribeAsInlineToAsync()
165+
{
166+
_strategies.Add(new InlineToAsync());
167+
return this;
168+
}
152169
}
153170

154171
internal record Position(long Floor, bool ShouldUpdateProgressFirst);
@@ -162,6 +179,24 @@ Task<Position> DetermineStartingPositionAsync(long highWaterMark, ShardName name
162179
CancellationToken token);
163180
}
164181

182+
internal class InlineToAsync(): IPositionStrategy
183+
{
184+
public string? DatabaseName => null;
185+
186+
public async Task<Position> DetermineStartingPositionAsync(long highWaterMark, ShardName name, ShardExecutionMode mode,
187+
IMartenDatabase database, CancellationToken token)
188+
{
189+
var current = await database.ProjectionProgressFor(name, token).ConfigureAwait(false);
190+
if (current > 0)
191+
{
192+
return new Position(current, false);
193+
}
194+
195+
var highest = await database.FetchHighestEventSequenceNumber(token).ConfigureAwait(false);
196+
return new Position(highest, true);
197+
}
198+
}
199+
165200
internal class FromSequence(string? databaseName, long sequence): IPositionStrategy
166201
{
167202
public string? DatabaseName { get; } = databaseName;

0 commit comments

Comments
 (0)