From 0630f5526d9ac22c76e69261e4d64b4ac4a4b0bc Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 17 Jul 2024 07:11:24 -0500 Subject: [PATCH] WIP: Experimental "fix" for GH-3310 --- ...10_inline_projections_with_quick_append.cs | 38 ++- .../Generated/EventStore/EventStorage.cs | 286 ------------------ ...EventProjectionRuntimeSupport2042637388.cs | 39 --- .../Events/Aggregation/AggregationRuntime.cs | 1 + .../QuickAppendEventsOperationBase.cs | 4 + .../Projections/ProjectionDocumentPolicy.cs | 1 + src/Marten/Events/StreamAction.cs | 37 ++- src/Marten/Internal/UnitOfWork.cs | 2 +- src/Marten/Internal/UpdateBatch.cs | 1 - src/Marten/Schema/DocumentMapping.cs | 6 + src/Marten/Storage/UpsertFunction.cs | 32 +- 11 files changed, 103 insertions(+), 344 deletions(-) delete mode 100644 src/EventSourcingTests/Internal/Generated/EventStore/EventStorage.cs delete mode 100644 src/EventSourcingTests/Internal/Generated/EventStore/MyEventProjectionRuntimeSupport2042637388.cs diff --git a/src/EventSourcingTests/Bugs/Bug_3310_inline_projections_with_quick_append.cs b/src/EventSourcingTests/Bugs/Bug_3310_inline_projections_with_quick_append.cs index 9be2e2386a..90fc0ed6cb 100644 --- a/src/EventSourcingTests/Bugs/Bug_3310_inline_projections_with_quick_append.cs +++ b/src/EventSourcingTests/Bugs/Bug_3310_inline_projections_with_quick_append.cs @@ -2,10 +2,12 @@ using System.Diagnostics; using System.Linq; using System.Threading.Tasks; +using JasperFx.CodeGeneration; using JasperFx.Core; using Marten; using Marten.Events; using Marten.Events.Projections; +using Marten.Metadata; using Marten.Schema; using Marten.Storage; using Marten.Testing.Harness; @@ -40,14 +42,46 @@ public Bug_3310_inline_projections_with_quick_append(ITestOutputHelper testOutpu opts.Events.AddEventType(); opts.Projections.Snapshot(SnapshotLifecycle.Inline); opts.Projections.Snapshot(SnapshotLifecycle.Inline); + + // opts.SetApplicationProject(GetType().Assembly); + // opts.GeneratedCodeMode = TypeLoadMode.Auto; + // opts.GeneratedCodeOutputPath = + // AppContext.BaseDirectory.ParentDirectory().ParentDirectory().ParentDirectory().AppendPath("Internal", "Generated"); + }); } + [Fact] + public async Task start_and_append_events_to_same_stream() + { + await using var session = theStore.LightweightSession(tenant); + + session.Logger = new TestOutputMartenLogger(_testOutputHelper); + + var streamId = Guid.NewGuid().ToString(); + + session.Events.StartStream(streamId,new LoadTestEvent(Guid.NewGuid(), 1), + new LoadTestEvent(Guid.NewGuid(), 2), new LoadTestEvent(Guid.NewGuid(), 3)); + await session.SaveChangesAsync(); + + _testOutputHelper.WriteLine("APPEND STARTS HERE"); + + session.Events.Append(streamId, new LoadTestEvent(Guid.NewGuid(), 4), new LoadTestEvent(Guid.NewGuid(), 5)); + await session.SaveChangesAsync(); + + var doc = await session.LoadAsync(streamId); + doc.Version.ShouldBe(5); + + + } + [Fact] public async Task create_1_stream_with_many_events() { await using var session = theStore.LightweightSession(tenant); + session.Logger = new TestOutputMartenLogger(_testOutputHelper); + await Preload(session); var sw = new Stopwatch(); @@ -187,13 +221,13 @@ public record LoadTestEvent(Guid Value, int Count); public record LoadTestUnrelatedEvent; [DocumentAlias("load_testing_inline_projection")] - public record LoadTestInlineProjection + public record LoadTestInlineProjection : IRevisioned { [Identity] public string StreamKey { get; init; } public Guid LastValue { get; init; } public long Sum { get; init; } - [Version] + //[Version] public int Version { get; set; } public LoadTestInlineProjection Apply(LoadTestEvent @event, LoadTestInlineProjection current) diff --git a/src/EventSourcingTests/Internal/Generated/EventStore/EventStorage.cs b/src/EventSourcingTests/Internal/Generated/EventStore/EventStorage.cs deleted file mode 100644 index ecc37085b1..0000000000 --- a/src/EventSourcingTests/Internal/Generated/EventStore/EventStorage.cs +++ /dev/null @@ -1,286 +0,0 @@ -// -#pragma warning disable -using Marten; -using Marten.Events; -using System; - -namespace Marten.Generated.EventStore -{ - // START: GeneratedEventDocumentStorage - public class GeneratedEventDocumentStorage : Marten.Events.EventDocumentStorage - { - private readonly Marten.StoreOptions _options; - - public GeneratedEventDocumentStorage(Marten.StoreOptions options) : base(options) - { - _options = options; - } - - - - public override Marten.Internal.Operations.IStorageOperation AppendEvent(Marten.Events.EventGraph events, Marten.Internal.IMartenSession session, Marten.Events.StreamAction stream, Marten.Events.IEvent e) - { - return new Marten.Generated.EventStore.AppendEventOperation(stream, e); - } - - - public override Marten.Internal.Operations.IStorageOperation InsertStream(Marten.Events.StreamAction stream) - { - return new Marten.Generated.EventStore.GeneratedInsertStream(stream); - } - - - public override Marten.Linq.QueryHandlers.IQueryHandler QueryForStream(Marten.Events.StreamAction stream) - { - return new Marten.Generated.EventStore.GeneratedStreamStateQueryHandler(stream.Id); - } - - - public override Marten.Internal.Operations.IStorageOperation UpdateStreamVersion(Marten.Events.StreamAction stream) - { - return new Marten.Generated.EventStore.GeneratedStreamVersionOperation(stream); - } - - - public override void ApplyReaderDataToEvent(System.Data.Common.DbDataReader reader, Marten.Events.IEvent e) - { - if (!reader.IsDBNull(3)) - { - var sequence = reader.GetFieldValue(3); - e.Sequence = sequence; - } - if (!reader.IsDBNull(4)) - { - var id = reader.GetFieldValue(4); - e.Id = id; - } - var streamId = reader.GetFieldValue(5); - e.StreamId = streamId; - if (!reader.IsDBNull(6)) - { - var version = reader.GetFieldValue(6); - e.Version = version; - } - if (!reader.IsDBNull(7)) - { - var timestamp = reader.GetFieldValue(7); - e.Timestamp = timestamp; - } - if (!reader.IsDBNull(8)) - { - var tenantId = reader.GetFieldValue(8); - e.TenantId = tenantId; - } - var isArchived = reader.GetFieldValue(9); - e.IsArchived = isArchived; - } - - - public override async System.Threading.Tasks.Task ApplyReaderDataToEventAsync(System.Data.Common.DbDataReader reader, Marten.Events.IEvent e, System.Threading.CancellationToken token) - { - if (!(await reader.IsDBNullAsync(3, token).ConfigureAwait(false))) - { - var sequence = await reader.GetFieldValueAsync(3, token).ConfigureAwait(false); - e.Sequence = sequence; - } - if (!(await reader.IsDBNullAsync(4, token).ConfigureAwait(false))) - { - var id = await reader.GetFieldValueAsync(4, token).ConfigureAwait(false); - e.Id = id; - } - var streamId = await reader.GetFieldValueAsync(5, token).ConfigureAwait(false); - e.StreamId = streamId; - if (!(await reader.IsDBNullAsync(6, token).ConfigureAwait(false))) - { - var version = await reader.GetFieldValueAsync(6, token).ConfigureAwait(false); - e.Version = version; - } - if (!(await reader.IsDBNullAsync(7, token).ConfigureAwait(false))) - { - var timestamp = await reader.GetFieldValueAsync(7, token).ConfigureAwait(false); - e.Timestamp = timestamp; - } - if (!(await reader.IsDBNullAsync(8, token).ConfigureAwait(false))) - { - var tenantId = await reader.GetFieldValueAsync(8, token).ConfigureAwait(false); - e.TenantId = tenantId; - } - var isArchived = await reader.GetFieldValueAsync(9, token).ConfigureAwait(false); - e.IsArchived = isArchived; - } - - } - - // END: GeneratedEventDocumentStorage - - - // START: AppendEventOperation - public class AppendEventOperation : Marten.Events.Operations.AppendEventOperationBase - { - private readonly Marten.Events.StreamAction _stream; - private readonly Marten.Events.IEvent _e; - - public AppendEventOperation(Marten.Events.StreamAction stream, Marten.Events.IEvent e) : base(stream, e) - { - _stream = stream; - _e = e; - } - - - public const string SQL = "insert into public.mt_events (data, type, mt_dotnet_type, seq_id, id, stream_id, version, timestamp, tenant_id) values (?, ?, ?, ?, ?, ?, ?, ?, ?)"; - - - public override void ConfigureCommand(Weasel.Postgresql.ICommandBuilder builder, Marten.Internal.IMartenSession session) - { - var parameters = builder.AppendWithParameters(SQL); - parameters[0].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Jsonb; - parameters[0].Value = session.Serializer.ToJson(Event.Data); - parameters[1].Value = Event.EventTypeName != null ? (object)Event.EventTypeName : System.DBNull.Value; - parameters[1].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Text; - parameters[2].Value = Event.DotNetTypeName != null ? (object)Event.DotNetTypeName : System.DBNull.Value; - parameters[2].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Text; - parameters[3].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Bigint; - parameters[3].Value = Event.Sequence; - parameters[4].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Uuid; - parameters[4].Value = Event.Id; - parameters[5].Value = Stream.Id; - parameters[5].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Uuid; - parameters[6].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Bigint; - parameters[6].Value = Event.Version; - parameters[7].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.TimestampTz; - parameters[7].Value = Event.Timestamp; - parameters[8].Value = Stream.TenantId != null ? (object)Stream.TenantId : System.DBNull.Value; - parameters[8].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Text; - } - - } - - // END: AppendEventOperation - - - // START: GeneratedInsertStream - public class GeneratedInsertStream : Marten.Events.Operations.InsertStreamBase - { - private readonly Marten.Events.StreamAction _stream; - - public GeneratedInsertStream(Marten.Events.StreamAction stream) : base(stream) - { - _stream = stream; - } - - - public const string SQL = "insert into public.mt_streams (id, type, version, tenant_id) values (?, ?, ?, ?)"; - - - public override void ConfigureCommand(Weasel.Postgresql.ICommandBuilder builder, Marten.Internal.IMartenSession session) - { - var parameters = builder.AppendWithParameters(SQL); - parameters[0].Value = Stream.Id; - parameters[0].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Uuid; - parameters[1].Value = Stream.AggregateTypeName != null ? (object)Stream.AggregateTypeName : System.DBNull.Value; - parameters[1].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Text; - parameters[2].Value = Stream.Version; - parameters[2].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Bigint; - parameters[3].Value = Stream.TenantId != null ? (object)Stream.TenantId : System.DBNull.Value; - parameters[3].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Text; - } - - } - - // END: GeneratedInsertStream - - - // START: GeneratedStreamStateQueryHandler - public class GeneratedStreamStateQueryHandler : Marten.Events.Querying.StreamStateQueryHandler - { - private readonly System.Guid _streamId; - - public GeneratedStreamStateQueryHandler(System.Guid streamId) - { - _streamId = streamId; - } - - - public const string SQL = "select id, version, type, timestamp, created as timestamp, is_archived from public.mt_streams where id = ?"; - - - public override void ConfigureCommand(Weasel.Postgresql.ICommandBuilder builder, Marten.Internal.IMartenSession session) - { - var npgsqlParameterArray = builder.AppendWithParameters(SQL); - npgsqlParameterArray[0].Value = _streamId; - npgsqlParameterArray[0].DbType = System.Data.DbType.Guid; - } - - - public override Marten.Events.StreamState Resolve(Marten.Internal.IMartenSession session, System.Data.Common.DbDataReader reader) - { - var streamState = new Marten.Events.StreamState(); - var id = reader.GetFieldValue(0); - streamState.Id = id; - var version = reader.GetFieldValue(1); - streamState.Version = version; - SetAggregateType(streamState, reader, session); - var lastTimestamp = reader.GetFieldValue(3); - streamState.LastTimestamp = lastTimestamp; - var created = reader.GetFieldValue(4); - streamState.Created = created; - var isArchived = reader.GetFieldValue(5); - streamState.IsArchived = isArchived; - return streamState; - } - - - public override async System.Threading.Tasks.Task ResolveAsync(Marten.Internal.IMartenSession session, System.Data.Common.DbDataReader reader, System.Threading.CancellationToken token) - { - var streamState = new Marten.Events.StreamState(); - var id = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); - streamState.Id = id; - var version = await reader.GetFieldValueAsync(1, token).ConfigureAwait(false); - streamState.Version = version; - await SetAggregateTypeAsync(streamState, reader, session, token).ConfigureAwait(false); - var lastTimestamp = await reader.GetFieldValueAsync(3, token).ConfigureAwait(false); - streamState.LastTimestamp = lastTimestamp; - var created = await reader.GetFieldValueAsync(4, token).ConfigureAwait(false); - streamState.Created = created; - var isArchived = await reader.GetFieldValueAsync(5, token).ConfigureAwait(false); - streamState.IsArchived = isArchived; - return streamState; - } - - } - - // END: GeneratedStreamStateQueryHandler - - - // START: GeneratedStreamVersionOperation - public class GeneratedStreamVersionOperation : Marten.Events.Operations.UpdateStreamVersion - { - private readonly Marten.Events.StreamAction _stream; - - public GeneratedStreamVersionOperation(Marten.Events.StreamAction stream) : base(stream) - { - _stream = stream; - } - - - public const string SQL = "update public.mt_streams set version = ? where id = ? and version = ?"; - - - public override void ConfigureCommand(Weasel.Postgresql.ICommandBuilder builder, Marten.Internal.IMartenSession session) - { - var parameters = builder.AppendWithParameters(SQL); - parameters[0].Value = Stream.Version; - parameters[0].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Bigint; - parameters[1].Value = Stream.Id; - parameters[1].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Uuid; - parameters[2].Value = Stream.ExpectedVersionOnServer; - parameters[2].NpgsqlDbType = NpgsqlTypes.NpgsqlDbType.Bigint; - } - - } - - // END: GeneratedStreamVersionOperation - - -} - diff --git a/src/EventSourcingTests/Internal/Generated/EventStore/MyEventProjectionRuntimeSupport2042637388.cs b/src/EventSourcingTests/Internal/Generated/EventStore/MyEventProjectionRuntimeSupport2042637388.cs deleted file mode 100644 index 270f576530..0000000000 --- a/src/EventSourcingTests/Internal/Generated/EventStore/MyEventProjectionRuntimeSupport2042637388.cs +++ /dev/null @@ -1,39 +0,0 @@ -// -#pragma warning disable -using EventSourcingTests.Bugs; -using System.Linq; - -namespace Marten.Generated.EventStore -{ - // START: MyEventProjectionInlineProjection2042637388 - public class MyEventProjectionInlineProjection2042637388 : Marten.Events.Projections.SyncEventProjection - { - private readonly EventSourcingTests.Bugs.Bug_2438_generated_code_throwing_nre.MyEventProjection _projection; - - public MyEventProjectionInlineProjection2042637388(EventSourcingTests.Bugs.Bug_2438_generated_code_throwing_nre.MyEventProjection projection) : base(projection) - { - _projection = projection; - } - - - public System.Action Project1 {get; set;} - - - public override void ApplyEvent(Marten.IDocumentOperations operations, Marten.Events.StreamAction streamAction, Marten.Events.IEvent e) - { - switch (e) - { - case Marten.Events.IEvent event_MyEvent11272: - Project1.Invoke(event_MyEvent11272.Data, operations); - break; - } - - } - - } - - // END: MyEventProjectionInlineProjection2042637388 - - -} - diff --git a/src/Marten/Events/Aggregation/AggregationRuntime.cs b/src/Marten/Events/Aggregation/AggregationRuntime.cs index 10f1926305..c5c50c6b30 100644 --- a/src/Marten/Events/Aggregation/AggregationRuntime.cs +++ b/src/Marten/Events/Aggregation/AggregationRuntime.cs @@ -117,6 +117,7 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session, return; } + // TODO -- clean this up var storageOperation = Storage.Upsert(aggregate, session, slice.Tenant.TenantId); if (Slicer is ISingleStreamSlicer && lastEvent != null && storageOperation is IRevisionedOperation op) { diff --git a/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs b/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs index 7cd1128179..4a54979449 100644 --- a/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs +++ b/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs @@ -43,6 +43,8 @@ public void Postprocess(DbDataReader reader, IList exceptions) { var values = reader.GetFieldValue(0); + Stream.ApplyEndingVersion(values[0]); + // Ignore the first value for (int i = 1; i < values.Length; i++) { @@ -104,6 +106,8 @@ public async Task PostprocessAsync(DbDataReader reader, IList excepti { var values = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); + Stream.ApplyEndingVersion(values[0]); + // Ignore the first value for (int i = 1; i < values.Length; i++) { diff --git a/src/Marten/Events/Projections/ProjectionDocumentPolicy.cs b/src/Marten/Events/Projections/ProjectionDocumentPolicy.cs index 6b76998c4d..2dc0d6114e 100644 --- a/src/Marten/Events/Projections/ProjectionDocumentPolicy.cs +++ b/src/Marten/Events/Projections/ProjectionDocumentPolicy.cs @@ -19,6 +19,7 @@ public void Apply(DocumentMapping mapping) mapping.UseOptimisticConcurrency = false; mapping.Metadata.Version.Enabled = false; mapping.UseNumericRevisions = true; + mapping.UseVersionFromMatchingStream = true; mapping.Metadata.Revision.Enabled = true; } } diff --git a/src/Marten/Events/StreamAction.cs b/src/Marten/Events/StreamAction.cs index 4e44f52be9..387354819b 100644 --- a/src/Marten/Events/StreamAction.cs +++ b/src/Marten/Events/StreamAction.cs @@ -282,23 +282,26 @@ internal void PrepareEvents(long currentVersion, EventGraph graph, Queue s var i = currentVersion; // Augment the events before checking expected versions, this allows the sequence/etc to properly be set on the resulting tombstone events - foreach (var @event in _events) + if (graph.AppendMode == EventAppendMode.Rich || ActionType == StreamActionType.Start) { - @event.Version = ++i; - if (@event.Id == Guid.Empty) + foreach (var @event in _events) { - @event.Id = CombGuidIdGeneration.NewGuid(); - } + @event.Version = ++i; + if (@event.Id == Guid.Empty) + { + @event.Id = CombGuidIdGeneration.NewGuid(); + } - if (sequences.TryDequeue(out var sequence)) - { - @event.Sequence = sequence; - } + if (sequences.TryDequeue(out var sequence)) + { + @event.Sequence = sequence; + } - @event.TenantId = session.TenantId; - @event.Timestamp = timestamp; + @event.TenantId = session.TenantId; + @event.Timestamp = timestamp; - ProcessMetadata(@event, graph, session); + ProcessMetadata(@event, graph, session); + } } if (currentVersion != 0) @@ -397,4 +400,14 @@ internal IEnumerable> ToTombstoneEvents(EventMapping mapping, T DotNetTypeName = mapping.DotNetTypeName }); } + + public void ApplyEndingVersion(long endingVersion) + { + var version = endingVersion; + foreach (var e in Events.Reverse()) + { + e.Version = version; + version--; + } + } } diff --git a/src/Marten/Internal/UnitOfWork.cs b/src/Marten/Internal/UnitOfWork.cs index f64b4ecdb9..70d53e047c 100644 --- a/src/Marten/Internal/UnitOfWork.cs +++ b/src/Marten/Internal/UnitOfWork.cs @@ -52,7 +52,7 @@ public void Add(IStorageOperation operation) } } - public IReadOnlyList AllOperations => _operations.Concat(_eventOperations).ToList(); + public IReadOnlyList AllOperations => _eventOperations.Concat(_operations).ToList(); public void Sort(StoreOptions options) { diff --git a/src/Marten/Internal/UpdateBatch.cs b/src/Marten/Internal/UpdateBatch.cs index f92ec2ee24..ca7822fac8 100644 --- a/src/Marten/Internal/UpdateBatch.cs +++ b/src/Marten/Internal/UpdateBatch.cs @@ -9,7 +9,6 @@ namespace Marten.Internal; public class UpdateBatch: IUpdateBatch { - private readonly IList _exceptions = new List(); private readonly IReadOnlyList _operations; public UpdateBatch(IReadOnlyList operations) diff --git a/src/Marten/Schema/DocumentMapping.cs b/src/Marten/Schema/DocumentMapping.cs index 5e74ac2a74..2eaab7bf20 100644 --- a/src/Marten/Schema/DocumentMapping.cs +++ b/src/Marten/Schema/DocumentMapping.cs @@ -123,6 +123,12 @@ public DocumentMapping(Type documentType, StoreOptions storeOptions) internal DocumentSchema Schema => _schema.Value; + /// + /// This is a workaround for the quick append + inline projection + /// issue + /// + public bool UseVersionFromMatchingStream { get; set; } + public HiloSettings HiloSettings { get => _hiloSettings; diff --git a/src/Marten/Storage/UpsertFunction.cs b/src/Marten/Storage/UpsertFunction.cs index b2743b659c..a1f5a5d23d 100644 --- a/src/Marten/Storage/UpsertFunction.cs +++ b/src/Marten/Storage/UpsertFunction.cs @@ -24,6 +24,10 @@ internal class UpsertFunction: Function public readonly IList Arguments = new List(); protected readonly string _primaryKeyFields; + protected readonly string _versionSourceTable; + protected readonly string _versionColumnName; + protected readonly string _tenantVersionWhereClause; + protected readonly string _andTenantVersionWhereClause; public UpsertFunction(DocumentMapping mapping, DbObjectName identifier = null, bool disableConcurrency = false): base(identifier ?? mapping.UpsertFunction) @@ -82,11 +86,29 @@ public UpsertFunction(DocumentMapping mapping, DbObjectName identifier = null, b Arguments.Add(new RevisionArgument()); } + if (_mapping.UseVersionFromMatchingStream) + { + _versionSourceTable = $"{_mapping.StoreOptions.Events.DatabaseSchemaName}.mt_streams"; + _versionColumnName = "version"; + } + else + { + _versionSourceTable = _tableName.QualifiedName; + _versionColumnName = "mt_version"; + } + if (mapping.TenancyStyle == TenancyStyle.Conjoined) { Arguments.Add(new TenantIdArgument()); _tenantWhereClause = $"{_tableName.QualifiedName}.{TenantIdColumn.Name} = {TenantIdArgument.ArgName}"; _andTenantWhereClause = $" and {_tenantWhereClause}"; + + + if (_mapping.UseVersionFromMatchingStream) + { + _tenantVersionWhereClause = $"{_versionSourceTable}.{TenantIdColumn.Name} = {TenantIdArgument.ArgName}"; + _andTenantVersionWhereClause = $" and {_tenantVersionWhereClause}"; + } } _primaryKeyFields = table.Columns.Where(x => x.IsPrimaryKey).Select(x => x.Name).Join(", "); @@ -163,6 +185,10 @@ protected virtual void writeFunction(TextWriter writer, string argList, string s string inserts, string valueList, string updates) { + var revisionModification = _mapping.UseVersionFromMatchingStream + ? "revision = current_version;" + : "revision = current_version + 1;"; + if (_mapping.Metadata.Revision.Enabled) { writer.WriteLine($@" @@ -174,10 +200,10 @@ protected virtual void writeFunction(TextWriter writer, string argList, string s current_version INTEGER; BEGIN -if revision = 1 then - SELECT mt_version FROM {_tableName.QualifiedName} into current_version WHERE id = docId {_andTenantWhereClause}; +if revision <= 1 then + SELECT {_versionColumnName} FROM {_versionSourceTable} into current_version WHERE id = docId {_andTenantVersionWhereClause}; if current_version is not null then - revision = current_version + 1; + {revisionModification} end if; end if;