diff --git a/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs b/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs index 78ed131174c..3f1b6f62643 100644 --- a/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs +++ b/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs @@ -79,6 +79,23 @@ public CosmosClientWrapper( _enableContentResponseOnWrite = options.EnableContentResponseOnWrite; } + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public static Stream Serialize(JToken document) + { + var stream = new MemoryStream(); + using var writer = new StreamWriter(stream, new UTF8Encoding(), bufferSize: 1024, leaveOpen: true); + + using var jsonWriter = new JsonTextWriter(writer); + CosmosClientWrapper.Serializer.Serialize(jsonWriter, document); + jsonWriter.Flush(); + return stream; + } + /// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to /// the same compatibility standards as public APIs. It may be changed or removed without notice in @@ -401,14 +418,7 @@ private static async Task CreateItemOnceAsync( (string ContainerId, JToken Document, IUpdateEntry Entry, CosmosClientWrapper Wrapper) parameters, CancellationToken cancellationToken = default) { - var stream = new MemoryStream(); - await using var __ = stream.ConfigureAwait(false); - var writer = new StreamWriter(stream, new UTF8Encoding(), bufferSize: 1024, leaveOpen: false); - await using var ___ = writer.ConfigureAwait(false); - - using var jsonWriter = new JsonTextWriter(writer); - Serializer.Serialize(jsonWriter, parameters.Document); - await jsonWriter.FlushAsync(cancellationToken).ConfigureAwait(false); + using var stream = Serialize(parameters.Document); var entry = parameters.Entry; var wrapper = parameters.Wrapper; @@ -493,13 +503,7 @@ private static async Task ReplaceItemOnceAsync( (string ContainerId, string ResourceId, JObject Document, IUpdateEntry Entry, CosmosClientWrapper Wrapper) parameters, CancellationToken cancellationToken = default) { - var stream = new MemoryStream(); - await using var __ = stream.ConfigureAwait(false); - var writer = new StreamWriter(stream, new UTF8Encoding(), bufferSize: 1024, leaveOpen: false); - await using var ___ = writer.ConfigureAwait(false); - using var jsonWriter = new JsonTextWriter(writer); - Serializer.Serialize(jsonWriter, parameters.Document); - await jsonWriter.FlushAsync(cancellationToken).ConfigureAwait(false); + using var stream = Serialize(parameters.Document); var entry = parameters.Entry; var wrapper = parameters.Wrapper; @@ -592,12 +596,12 @@ public virtual PartitionKey GetPartitionKeyValue(IUpdateEntry updateEntry) /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - public virtual ICosmosTransactionalBatchWrapper CreateTransactionalBatch(string containerId, PartitionKey partitionKeyValue) + public virtual ICosmosTransactionalBatchWrapper CreateTransactionalBatch(string containerId, PartitionKey partitionKeyValue, bool checkSize) { var container = Client.GetDatabase(_databaseId).GetContainer(containerId); var batch = container.CreateTransactionalBatch(partitionKeyValue); - return new CosmosTransactionalBatchWrapper(batch, containerId, partitionKeyValue, _enableContentResponseOnWrite); + return new CosmosTransactionalBatchWrapper(batch, containerId, partitionKeyValue, checkSize, _enableContentResponseOnWrite); } /// @@ -606,7 +610,7 @@ public virtual ICosmosTransactionalBatchWrapper CreateTransactionalBatch(string /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - public virtual CosmosTransactionalBatchResult ExecuteBatch(ICosmosTransactionalBatchWrapper batch) + public virtual CosmosTransactionalBatchResult ExecuteTransactionalBatch(ICosmosTransactionalBatchWrapper batch) { _databaseLogger.SyncNotSupported(); @@ -623,7 +627,7 @@ private static CosmosTransactionalBatchResult ExecuteBatchOnce(DbContext _, /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - public virtual Task ExecuteBatchAsync(ICosmosTransactionalBatchWrapper batch, CancellationToken cancellationToken = default) + public virtual Task ExecuteTransactionalBatchAsync(ICosmosTransactionalBatchWrapper batch, CancellationToken cancellationToken = default) => _executionStrategy.ExecuteAsync((batch, this), ExecuteBatchOnceAsync, null, cancellationToken); private static async Task ExecuteBatchOnceAsync(DbContext _, diff --git a/src/EFCore.Cosmos/Storage/Internal/CosmosDatabaseWrapper.cs b/src/EFCore.Cosmos/Storage/Internal/CosmosDatabaseWrapper.cs index 2dc58dfd523..a8608b8e48b 100644 --- a/src/EFCore.Cosmos/Storage/Internal/CosmosDatabaseWrapper.cs +++ b/src/EFCore.Cosmos/Storage/Internal/CosmosDatabaseWrapper.cs @@ -74,31 +74,43 @@ public override int SaveChanges(IList entries) } } - foreach (var batch in groups.Batches) + foreach (var batch in groups.BatchableUpdateEntries) { - var transaction = CreateTransaction(batch); + if (batch.UpdateEntries.Count == 1 && _currentDbContext.Context.Database.AutoTransactionBehavior != AutoTransactionBehavior.Always) + { + if (Save(batch.UpdateEntries[0])) + { + rowsAffected++; + } - try + continue; + } + + foreach (var transaction in CreateTransactions(batch)) { - var response = _cosmosClient.ExecuteBatch(transaction); - if (!response.IsSuccess) + try { - var exception = WrapUpdateException(response.Exception, response.ErroredEntries); - if (exception is not DbUpdateConcurrencyException - || !Dependencies.Logger.OptimisticConcurrencyException( - batch.Items.First().Entry.Context, batch.Items.Select(x => x.Entry).ToArray(), (DbUpdateConcurrencyException)exception, null).IsSuppressed) + var response = _cosmosClient.ExecuteTransactionalBatch(transaction); + if (!response.IsSuccess) { - throw exception; + var exception = WrapUpdateException(response.Exception, response.ErroredEntries); + if (exception is not DbUpdateConcurrencyException + || !Dependencies.Logger.OptimisticConcurrencyException( + transaction.Entries.First().Entry.Context, transaction.Entries.Select(x => x.Entry).ToArray(), (DbUpdateConcurrencyException)exception, null) + .IsSuppressed) + { + throw exception; + } } } - } - catch (Exception ex) when (ex is not DbUpdateException and not OperationCanceledException) - { - var exception = WrapUpdateException(ex, batch.Items.Select(x => x.Entry).ToArray()); - throw exception; - } + catch (Exception ex) when (ex is not DbUpdateException and not OperationCanceledException) + { + var exception = WrapUpdateException(ex, transaction.Entries.Select(x => x.Entry).ToArray()); + throw exception; + } - rowsAffected += batch.Items.Count; + rowsAffected += transaction.Entries.Count; + } } return rowsAffected; @@ -130,32 +142,43 @@ public override async Task SaveChangesAsync( } } - foreach (var batch in groups.Batches) + foreach (var batch in groups.BatchableUpdateEntries) { - var transaction = CreateTransaction(batch); + if (batch.UpdateEntries.Count == 1 && _currentDbContext.Context.Database.AutoTransactionBehavior != AutoTransactionBehavior.Always) + { + if (await SaveAsync(batch.UpdateEntries[0], cancellationToken).ConfigureAwait(false)) + { + rowsAffected++; + } - try + continue; + } + + foreach (var transaction in CreateTransactions(batch)) { - var response = await _cosmosClient.ExecuteBatchAsync(transaction, cancellationToken).ConfigureAwait(false); - if (!response.IsSuccess) + try { - var exception = WrapUpdateException(response.Exception, response.ErroredEntries); - if (exception is not DbUpdateConcurrencyException - || !(await Dependencies.Logger.OptimisticConcurrencyExceptionAsync( - batch.Items.First().Entry.Context, batch.Items.Select(x => x.Entry).ToArray(), (DbUpdateConcurrencyException)exception, null, cancellationToken) - .ConfigureAwait(false)).IsSuppressed) + var response = await _cosmosClient.ExecuteTransactionalBatchAsync(transaction, cancellationToken).ConfigureAwait(false); + if (!response.IsSuccess) { - throw exception; + var exception = WrapUpdateException(response.Exception, response.ErroredEntries); + if (exception is not DbUpdateConcurrencyException + || !(await Dependencies.Logger.OptimisticConcurrencyExceptionAsync( + transaction.Entries.First().Entry.Context, transaction.Entries.Select(x => x.Entry).ToArray(), (DbUpdateConcurrencyException)exception, null, cancellationToken) + .ConfigureAwait(false)).IsSuppressed) + { + throw exception; + } } } - } - catch (Exception ex) when (ex is not DbUpdateException and not OperationCanceledException) - { - var exception = WrapUpdateException(ex, batch.Items.Select(x => x.Entry).ToArray()); - throw exception; - } + catch (Exception ex) when (ex is not DbUpdateException and not OperationCanceledException) + { + var exception = WrapUpdateException(ex, transaction.Entries.Select(x => x.Entry).ToArray()); + throw exception; + } - rowsAffected += batch.Items.Count; + rowsAffected += transaction.Entries.Count; + } } return rowsAffected; @@ -196,7 +219,7 @@ private SaveGroups CreateSaveGroups(IList entries) { return new SaveGroups { - Batches = [], + BatchableUpdateEntries = Array.Empty<(Grouping Key, List UpdateEntries)>(), SingleUpdateEntries = cosmosUpdateEntries }; } @@ -215,30 +238,57 @@ private SaveGroups CreateSaveGroups(IList entries) } } - if (_currentDbContext.Context.Database.AutoTransactionBehavior == AutoTransactionBehavior.Always && - entriesWithTriggers.Count >= 1 && rootEntriesToSave.Count >= 2) + if (entriesWithTriggers.Count == 0 && entriesWithoutTriggers.Count == 0) { - throw new InvalidOperationException(CosmosStrings.SaveChangesAutoTransactionBehaviorAlwaysTriggerAtomicity); + return new SaveGroups { BatchableUpdateEntries = [], SingleUpdateEntries = entriesWithTriggers }; } - var batches = CreateBatches(entriesWithoutTriggers).ToArray(); - - if (_currentDbContext.Context.Database.AutoTransactionBehavior == AutoTransactionBehavior.Always && - batches.Length > 1) + if (_currentDbContext.Context.Database.AutoTransactionBehavior == AutoTransactionBehavior.Always) { - throw new InvalidOperationException(CosmosStrings.SaveChangesAutoTransactionBehaviorAlwaysAtomicity); + if (entriesWithTriggers.Count >= 1) + { + if (rootEntriesToSave.Count >= 2) + { + throw new InvalidOperationException(CosmosStrings.SaveChangesAutoTransactionBehaviorAlwaysTriggerAtomicity); + } + + // There is only 1 entry, and it has a trigger + return new SaveGroups + { + BatchableUpdateEntries = [], + SingleUpdateEntries = entriesWithTriggers + }; + } + + var firstEntry = entriesWithoutTriggers[0]; + var key = new Grouping(firstEntry.CollectionId, _cosmosClient.GetPartitionKeyValue(firstEntry.Entry)); + if (entriesWithoutTriggers.Count > 100 || + !entriesWithoutTriggers.All(entry => + entry.CollectionId == key.ContainerId && + _cosmosClient.GetPartitionKeyValue(entry.Entry) == key.PartitionKeyValue)) + { + throw new InvalidOperationException(CosmosStrings.SaveChangesAutoTransactionBehaviorAlwaysAtomicity); + } + + return new SaveGroups + { + BatchableUpdateEntries = [(key, entriesWithoutTriggers)], + SingleUpdateEntries = [] + }; } + + var batches = CreateBatches(entriesWithoutTriggers); return new SaveGroups { - Batches = batches, + BatchableUpdateEntries = batches, SingleUpdateEntries = entriesWithTriggers }; } - private IEnumerable<(Grouping Key, List Items)> CreateBatches(List entries) + private List<(Grouping Key, List UpdateEntries)> CreateBatches(List entries) { - const int maxOperationsPerBatch = 100; + var results = new List<(Grouping Key, List UpdateEntries)>(); var buckets = new Dictionary>(); foreach (var entry in entries) @@ -248,26 +298,14 @@ private SaveGroups CreateSaveGroups(IList entries) ref var list = ref CollectionsMarshal.GetValueRefOrAddDefault(buckets, key, out var exists); if (!exists || list is null) { - list = new(); + list = []; + results.Add((key, list)); } list.Add(entry); - - if (list.Count == maxOperationsPerBatch) - { - var listCopy = list; - list = null; - yield return (key, listCopy); - } } - foreach (var kvp in buckets) - { - if (kvp.Value != null) - { - yield return (kvp.Key, kvp.Value); - } - } + return results; } private CosmosUpdateEntry? CreateCosmosUpdateEntry(IUpdateEntry entry) @@ -426,32 +464,52 @@ private SaveGroups CreateSaveGroups(IList entries) }; } - private ICosmosTransactionalBatchWrapper CreateTransaction((Grouping Key, List Items) batch) + private IEnumerable CreateTransactions((Grouping Key, List UpdateEntries) batch) { - var transaction = _cosmosClient.CreateTransactionalBatch(batch.Key.ContainerId, batch.Key.PartitionKeyValue); + const int maxOperationsPerBatch = 100; - foreach (var updateEntry in batch.Items) + // We turn off size checking in EF for AutoTransactionBehavior.Always as all entities will always go in a single transaction. + // Cosmos will throw if the request is too large. + var checkSize = _currentDbContext.Context.Database.AutoTransactionBehavior != AutoTransactionBehavior.Always; + var transaction = _cosmosClient.CreateTransactionalBatch(batch.Key.ContainerId, batch.Key.PartitionKeyValue, checkSize); + + foreach (var updateEntry in batch.UpdateEntries) { - switch (updateEntry.Operation) + // Stream is disposed by Transaction.ExecuteAsync + var stream = updateEntry.Document != null ? CosmosClientWrapper.Serialize(updateEntry.Document) : null; + + // With AutoTransactionBehavior.Always, AddToTransaction will always return true. + if (!AddToTransaction(transaction, updateEntry, stream)) + { + yield return transaction; + transaction = _cosmosClient.CreateTransactionalBatch(batch.Key.ContainerId, batch.Key.PartitionKeyValue, checkSize); + AddToTransaction(transaction, updateEntry, stream); + continue; + } + + if (checkSize && transaction.Entries.Count == maxOperationsPerBatch) { - case CosmosCudOperation.Create: - transaction.CreateItem(updateEntry.Document!, updateEntry.Entry); - break; - case CosmosCudOperation.Update: - transaction.ReplaceItem( - updateEntry.DocumentSource.GetId(updateEntry.Entry.SharedIdentityEntry ?? updateEntry.Entry), - updateEntry.Document!, - updateEntry.Entry); - break; - case CosmosCudOperation.Delete: - transaction.DeleteItem(updateEntry.DocumentSource.GetId(updateEntry.Entry), updateEntry.Entry); - break; - default: - throw new UnreachableException(); + yield return transaction; + transaction = _cosmosClient.CreateTransactionalBatch(batch.Key.ContainerId, batch.Key.PartitionKeyValue, checkSize); } } - return transaction; + if (transaction.Entries.Count != 0) + { + yield return transaction; + } + } + + private bool AddToTransaction(ICosmosTransactionalBatchWrapper transaction, CosmosUpdateEntry updateEntry, Stream? stream) + { + var id = updateEntry.DocumentSource.GetId(updateEntry.Entry.SharedIdentityEntry ?? updateEntry.Entry); + return updateEntry.Operation switch + { + CosmosCudOperation.Create => transaction.CreateItem(id, stream!, updateEntry.Entry), + CosmosCudOperation.Update => transaction.ReplaceItem(id, stream!, updateEntry.Entry), + CosmosCudOperation.Delete => transaction.DeleteItem(id, updateEntry.Entry), + _ => throw new UnreachableException(), + }; } private bool Save(CosmosUpdateEntry updateEntry) @@ -593,8 +651,8 @@ private DbUpdateException WrapUpdateException(Exception exception, IReadOnlyList private sealed class SaveGroups { public required IEnumerable SingleUpdateEntries { get; init; } - - public required IEnumerable<(Grouping Key, List Items)> Batches { get; init; } + + public required IEnumerable<(Grouping Key, List UpdateEntries)> BatchableUpdateEntries { get; init; } } private sealed class CosmosUpdateEntry diff --git a/src/EFCore.Cosmos/Storage/Internal/CosmosTransactionalBatchWrapper.cs b/src/EFCore.Cosmos/Storage/Internal/CosmosTransactionalBatchWrapper.cs index b739c3a245b..f2f9fead506 100644 --- a/src/EFCore.Cosmos/Storage/Internal/CosmosTransactionalBatchWrapper.cs +++ b/src/EFCore.Cosmos/Storage/Internal/CosmosTransactionalBatchWrapper.cs @@ -2,8 +2,6 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Text; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; namespace Microsoft.EntityFrameworkCore.Cosmos.Storage.Internal; @@ -15,9 +13,15 @@ namespace Microsoft.EntityFrameworkCore.Cosmos.Storage.Internal; /// public class CosmosTransactionalBatchWrapper : ICosmosTransactionalBatchWrapper { + private const int OperationSerializationOverheadOverEstimateInBytes = 200; + private const int MaxSize = 2_097_152; // 2MiB + + private long _size; + private readonly TransactionalBatch _transactionalBatch; private readonly string _collectionId; private readonly PartitionKey _partitionKeyValue; + private readonly bool _checkSize; private readonly bool? _enableContentResponseOnWrite; private readonly List _entries = new(); @@ -27,11 +31,17 @@ public class CosmosTransactionalBatchWrapper : ICosmosTransactionalBatchWrapper /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - public CosmosTransactionalBatchWrapper(TransactionalBatch transactionalBatch, string collectionId, PartitionKey partitionKeyValue, bool? enableContentResponseOnWrite) + public CosmosTransactionalBatchWrapper( + TransactionalBatch transactionalBatch, + string collectionId, + PartitionKey partitionKeyValue, + bool checkSize, + bool? enableContentResponseOnWrite) { _transactionalBatch = transactionalBatch; _collectionId = collectionId; _partitionKeyValue = partitionKeyValue; + _checkSize = checkSize; _enableContentResponseOnWrite = enableContentResponseOnWrite; } @@ -65,19 +75,25 @@ public CosmosTransactionalBatchWrapper(TransactionalBatch transactionalBatch, st /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - public void CreateItem(JToken document, IUpdateEntry updateEntry) + public bool CreateItem(string id, Stream stream, IUpdateEntry updateEntry) { - // stream is disposed by TransactionalBatch.ExecuteAsync - var stream = new MemoryStream(); - using var writer = new StreamWriter(stream, new UTF8Encoding(), bufferSize: 1024, leaveOpen: true); + var itemRequestOptions = CreateItemRequestOptions(updateEntry, _enableContentResponseOnWrite, out var itemRequestOptionsLength); + + if (_checkSize) + { + var size = stream.Length + itemRequestOptionsLength + OperationSerializationOverheadOverEstimateInBytes; - using var jsonWriter = new JsonTextWriter(writer); - CosmosClientWrapper.Serializer.Serialize(jsonWriter, document); - jsonWriter.Flush(); + if (_size + size > MaxSize && _size != 0) + { + return false; + } + _size += size; + } - var itemRequestOptions = CreateItemRequestOptions(updateEntry, _enableContentResponseOnWrite); _transactionalBatch.CreateItemStream(stream, itemRequestOptions); - _entries.Add(new CosmosTransactionalBatchEntry(updateEntry, CosmosCudOperation.Create, document["id"]!.ToString())); + _entries.Add(new CosmosTransactionalBatchEntry(updateEntry, CosmosCudOperation.Create, id)); + + return true; } /// @@ -86,20 +102,25 @@ public void CreateItem(JToken document, IUpdateEntry updateEntry) /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - public void ReplaceItem(string documentId, JToken document, IUpdateEntry updateEntry) + public bool ReplaceItem(string documentId, Stream stream, IUpdateEntry updateEntry) { - // stream is disposed by TransactionalBatch.ExecuteAsync - var stream = new MemoryStream(); - var writer = new StreamWriter(stream, new UTF8Encoding(), bufferSize: 1024, leaveOpen: true); - using var _ = writer; - using var jsonWriter = new JsonTextWriter(writer); - CosmosClientWrapper.Serializer.Serialize(jsonWriter, document); - jsonWriter.Flush(); + var itemRequestOptions = CreateItemRequestOptions(updateEntry, _enableContentResponseOnWrite, out var itemRequestOptionsLength); + + if (_checkSize) + { + var size = stream.Length + itemRequestOptionsLength + OperationSerializationOverheadOverEstimateInBytes + Encoding.UTF8.GetByteCount(documentId); - var itemRequestOptions = CreateItemRequestOptions(updateEntry, _enableContentResponseOnWrite); + if (_size + size > MaxSize && _size != 0) + { + return false; + } + _size += size; + } _transactionalBatch.ReplaceItemStream(documentId, stream, itemRequestOptions); - _entries.Add(new CosmosTransactionalBatchEntry(updateEntry, CosmosCudOperation.Update, document["id"]!.ToString())); + _entries.Add(new CosmosTransactionalBatchEntry(updateEntry, CosmosCudOperation.Update, documentId)); + + return true; } /// @@ -108,11 +129,25 @@ public void ReplaceItem(string documentId, JToken document, IUpdateEntry updateE /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - public void DeleteItem(string documentId, IUpdateEntry updateEntry) + public bool DeleteItem(string documentId, IUpdateEntry updateEntry) { - var itemRequestOptions = CreateItemRequestOptions(updateEntry, _enableContentResponseOnWrite); + var itemRequestOptions = CreateItemRequestOptions(updateEntry, _enableContentResponseOnWrite, out var itemRequestOptionsLength); + + if (_checkSize) + { + var size = itemRequestOptionsLength + OperationSerializationOverheadOverEstimateInBytes + Encoding.UTF8.GetByteCount(documentId); + + if (_size + size > MaxSize && _size != 0) + { + return false; + } + _size += size; + } + _transactionalBatch.DeleteItem(documentId, itemRequestOptions); _entries.Add(new CosmosTransactionalBatchEntry(updateEntry, CosmosCudOperation.Delete, documentId)); + + return true; } /// @@ -123,12 +158,22 @@ public void DeleteItem(string documentId, IUpdateEntry updateEntry) /// public TransactionalBatch GetTransactionalBatch() => _transactionalBatch; - private static TransactionalBatchItemRequestOptions? CreateItemRequestOptions(IUpdateEntry entry, bool? enableContentResponseOnWrite) + private TransactionalBatchItemRequestOptions? CreateItemRequestOptions(IUpdateEntry entry, bool? enableContentResponseOnWrite, out int size) { var helper = RequestOptionsHelper.Create(entry, enableContentResponseOnWrite); + size = 0; + + if (helper == null) + { + return null; + } + + if (_checkSize && helper.IfMatchEtag != null) + { + size += helper.IfMatchEtag.Length; + } - return helper == null - ? null - : new TransactionalBatchItemRequestOptions { IfMatchEtag = helper.IfMatchEtag, EnableContentResponseOnWrite = helper.EnableContentResponseOnWrite }; + // EnableContentResponseOnWrite is a header so no request body size for that. + return new TransactionalBatchItemRequestOptions { IfMatchEtag = helper.IfMatchEtag, EnableContentResponseOnWrite = helper.EnableContentResponseOnWrite }; } } diff --git a/src/EFCore.Cosmos/Storage/Internal/ICosmosClientWrapper.cs b/src/EFCore.Cosmos/Storage/Internal/ICosmosClientWrapper.cs index b7f9bcaaf1f..3c3d545ec90 100644 --- a/src/EFCore.Cosmos/Storage/Internal/ICosmosClientWrapper.cs +++ b/src/EFCore.Cosmos/Storage/Internal/ICosmosClientWrapper.cs @@ -208,7 +208,7 @@ IAsyncEnumerable ExecuteSqlQueryAsync( /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - ICosmosTransactionalBatchWrapper CreateTransactionalBatch(string containerId, PartitionKey partitionKeyValue); + ICosmosTransactionalBatchWrapper CreateTransactionalBatch(string containerId, PartitionKey partitionKeyValue, bool checkSize); /// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to @@ -216,7 +216,7 @@ IAsyncEnumerable ExecuteSqlQueryAsync( /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - Task ExecuteBatchAsync(ICosmosTransactionalBatchWrapper batch, CancellationToken cancellationToken = default); + Task ExecuteTransactionalBatchAsync(ICosmosTransactionalBatchWrapper batch, CancellationToken cancellationToken = default); /// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to @@ -224,5 +224,5 @@ IAsyncEnumerable ExecuteSqlQueryAsync( /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - CosmosTransactionalBatchResult ExecuteBatch(ICosmosTransactionalBatchWrapper batch); + CosmosTransactionalBatchResult ExecuteTransactionalBatch(ICosmosTransactionalBatchWrapper batch); } diff --git a/src/EFCore.Cosmos/Storage/Internal/ICosmosTransactionalBatchWrapper.cs b/src/EFCore.Cosmos/Storage/Internal/ICosmosTransactionalBatchWrapper.cs index 8afcab41e98..50615843c52 100644 --- a/src/EFCore.Cosmos/Storage/Internal/ICosmosTransactionalBatchWrapper.cs +++ b/src/EFCore.Cosmos/Storage/Internal/ICosmosTransactionalBatchWrapper.cs @@ -43,7 +43,7 @@ public interface ICosmosTransactionalBatchWrapper /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - void CreateItem(JToken document, IUpdateEntry updateEntry); + bool CreateItem(string id, Stream stream, IUpdateEntry updateEntry); /// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to @@ -51,7 +51,7 @@ public interface ICosmosTransactionalBatchWrapper /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - void DeleteItem(string documentId, IUpdateEntry updateEntry); + bool DeleteItem(string documentId, IUpdateEntry updateEntry); /// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to @@ -59,7 +59,7 @@ public interface ICosmosTransactionalBatchWrapper /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - void ReplaceItem(string documentId, JToken document, IUpdateEntry updateEntry); + bool ReplaceItem(string documentId, Stream stream, IUpdateEntry updateEntry); /// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to diff --git a/test/EFCore.Cosmos.FunctionalTests/CosmosTransactionalBatchTest.cs b/test/EFCore.Cosmos.FunctionalTests/CosmosTransactionalBatchTest.cs index a1840c7c90c..9ab10188569 100644 --- a/test/EFCore.Cosmos.FunctionalTests/CosmosTransactionalBatchTest.cs +++ b/test/EFCore.Cosmos.FunctionalTests/CosmosTransactionalBatchTest.cs @@ -133,6 +133,22 @@ public virtual async Task SaveChanges_transactionbehavior_always_fails_for_multi Assert.Equal(0, customersCount); } + [ConditionalFact] + public virtual async Task SaveChanges_succeeds_for_101_entities_in_same_partition() + { + var contextFactory = await InitializeAsync(); + + using var context = contextFactory.CreateContext(); + + context.Customers.AddRange(Enumerable.Range(0, 101).Select(x => new Customer { Id = x.ToString(), PartitionKey = "1" })); + + await context.SaveChangesAsync(); + + using var assertContext = contextFactory.CreateContext(); + var customersCount = await assertContext.Customers.CountAsync(); + Assert.Equal(101, customersCount); + } + [ConditionalFact] public virtual async Task SaveChanges_transactionbehavior_always_fails_for_101_entities_in_same_partition() { @@ -251,6 +267,395 @@ public virtual async Task SaveChanges_transactionbehavior_always_fails_for_singl Assert.Equal(0, customersCount); } + [ConditionalFact] + public virtual async Task SaveChanges_three_1mb_entries_succeeds() + { + var contextFactory = await InitializeAsync(); + + using var context = contextFactory.CreateContext(); + + context.Customers.Add(new Customer { Id = "1", Name = new string('x', 1_000_000), PartitionKey = "1" }); + context.Customers.Add(new Customer { Id = "2", Name = new string('x', 1_000_000), PartitionKey = "1" }); + context.Customers.Add(new Customer { Id = "3", Name = new string('x', 1_000_000), PartitionKey = "1" }); + + await context.SaveChangesAsync(); + + using var assertContext = contextFactory.CreateContext(); + var customersCount = await assertContext.Customers.CountAsync(); + Assert.Equal(3, customersCount); + } + + [ConditionalFact] + public virtual async Task SaveChanges_entity_too_large_throws() + { + var contextFactory = await InitializeAsync(); + + using var context = contextFactory.CreateContext(); + + context.Customers.Add(new Customer { Id = "1", Name = new string('x', 50_000_000), PartitionKey = "1" }); + + var exception = await Assert.ThrowsAnyAsync(() => context.SaveChangesAsync()); + Assert.NotNull(exception.InnerException); + var cosmosException = Assert.IsAssignableFrom(exception.InnerException); + Assert.Equal(HttpStatusCode.RequestEntityTooLarge, cosmosException.StatusCode); + + using var assertContext = contextFactory.CreateContext(); + var customersCount = await assertContext.Customers.CountAsync(); + Assert.Equal(0, customersCount); + } + + [ConditionalTheory, InlineData(true), InlineData(false)] + public virtual async Task SaveChanges_exactly_2_mib_does_not_split_and_one_byte_over_splits(bool oneByteOver) + { + var contextFactory = await InitializeAsync(); + + using var context = contextFactory.CreateContext(); + + var customer1 = new Customer { Id = new string('x', 1023), PartitionKey = new string('x', 1023) }; + var customer2 = new Customer { Id = new string('y', 1023), PartitionKey = new string('x', 1023) }; + + context.Customers.Add(customer1); + context.Customers.Add(customer2); + + await context.SaveChangesAsync(); + ListLoggerFactory.Clear(); + + customer1.Name = new string('x', 1044994); + customer2.Name = new string('x', 1044994); + + if (oneByteOver) + { + customer1.Name += 'x'; + } + + await context.SaveChangesAsync(); + using var assertContext = contextFactory.CreateContext(); + Assert.Equal(2, (await context.Customers.ToListAsync()).Count); + + if (oneByteOver) + { + Assert.Equal(2, ListLoggerFactory.Log.Count(x => x.Id == CosmosEventId.ExecutedTransactionalBatch)); + } + else + { + Assert.Equal(1, ListLoggerFactory.Log.Count(x => x.Id == CosmosEventId.ExecutedTransactionalBatch)); + } + } + + [ConditionalFact] + public virtual async Task SaveChanges_too_large_entry_after_smaller_throws_after_saving_smaller() + { + var contextFactory = await InitializeAsync(); + + using var context = contextFactory.CreateContext(); + + context.Customers.Add(new Customer { Id = "1", Name = new string('x', 1_000_000), PartitionKey = "1" }); + context.Customers.Add(new Customer { Id = "2", Name = new string('x', 50_000_000), PartitionKey = "1" }); + + await Assert.ThrowsAsync(() => context.SaveChangesAsync()); + + using var assertContext = contextFactory.CreateContext(); + var customersCount = await assertContext.Customers.CountAsync(); + Assert.Equal(1, customersCount); + Assert.Equal("1", (await assertContext.Customers.FirstAsync()).Id); + } + + [ConditionalFact] + public virtual async Task SaveChanges_transaction_behaviour_always_payload_larger_than_cosmos_limit_throws() + { + var contextFactory = await InitializeAsync(); + + using var context = contextFactory.CreateContext(); + context.Database.AutoTransactionBehavior = AutoTransactionBehavior.Always; + + context.Customers.Add(new Customer { Id = "1", Name = new string('x', 50_000_000 / 2), PartitionKey = "1" }); + context.Customers.Add(new Customer { Id = "2", Name = new string('x', 50_000_000 / 2), PartitionKey = "1" }); + + var exception = await Assert.ThrowsAnyAsync(() => context.SaveChangesAsync()); + Assert.NotNull(exception.InnerException); + var cosmosException = Assert.IsAssignableFrom(exception.InnerException); + Assert.Equal(HttpStatusCode.RequestEntityTooLarge, cosmosException.StatusCode); + + using var assertContext = contextFactory.CreateContext(); + var customersCount = await assertContext.Customers.CountAsync(); + Assert.Equal(0, customersCount); + } + + // The tests below will fail if the cosmos db sdk is updated and the serialization logic for transactional batches has changed + + [ConditionalTheory, InlineData(true), InlineData(false)] + public virtual async Task SaveChanges_transaction_behaviour_always_single_entity_payload_can_be_exactly_cosmos_limit_and_throws_when_1byte_over(bool oneByteOver) + { + var contextFactory = await InitializeAsync(); + + using var context = contextFactory.CreateContext(); + context.Database.AutoTransactionBehavior = AutoTransactionBehavior.Always; + + var customer = new Customer { Id = new string('x', 1_000), PartitionKey = new string('x', 1_000) }; + + context.Customers.Add(customer); + await context.SaveChangesAsync(); + + // Total document size will be: 2_097_503. Total request size will be: 2_098_541 + // Normally 2MiB is 2_097_152, but cosmos appears to allow ~1Kib (1389 bytes) extra + var str = new string('x', 2_095_228); + customer.Name = str; + + if (oneByteOver) + { + customer.Name += 'x'; + var ex = await Assert.ThrowsAsync(() => context.SaveChangesAsync()); + Assert.IsType(ex.InnerException); + } + else + { + await context.SaveChangesAsync(); + + using var assertContext = contextFactory.CreateContext(); + var dbCustomer = await assertContext.Customers.FirstAsync(); + Assert.Equal(dbCustomer.Name, str); + } + } + + [ConditionalTheory, InlineData(true), InlineData(false)] + public virtual async Task SaveChanges_update_id_contains_special_chars_which_makes_request_larger_than_2_mib_splits_into_2_batches(bool isIdSpecialChar) + { + var contextFactory = await InitializeAsync(); + + using var context = contextFactory.CreateContext(); + + string id1 = isIdSpecialChar ? new string('€', 341) : new string('x', 341); + string id2 = isIdSpecialChar ? new string('Ω', 341) : new string('y', 341); + + var customer1 = new Customer { Id = id1, PartitionKey = new string('€', 341) }; + var customer2 = new Customer { Id = id2, PartitionKey = new string('€', 341) }; + + context.Customers.Add(customer1); + context.Customers.Add(customer2); + + await context.SaveChangesAsync(); + ListLoggerFactory.Clear(); + + customer1.Name = new string('x', 1046358); + customer2.Name = new string('x', 1046358); + + await context.SaveChangesAsync(); + using var assertContext = contextFactory.CreateContext(); + Assert.Equal(2, (await context.Customers.ToListAsync()).Count); + + // The id being a special character should make the difference whether this fits in 1 batch. + if (isIdSpecialChar) + { + Assert.Equal(2, ListLoggerFactory.Log.Count(x => x.Id == CosmosEventId.ExecutedTransactionalBatch)); + } + else + { + Assert.Equal(1, ListLoggerFactory.Log.Count(x => x.Id == CosmosEventId.ExecutedTransactionalBatch)); + } + } + + [ConditionalTheory, InlineData(true), InlineData(false)] + public virtual async Task SaveChanges_transaction_behaviour_always_update_entities_payload_can_be_exactly_cosmos_limit_and_throws_when_1byte_over(bool oneByteOver) + { + var contextFactory = await InitializeAsync(); + + using var context = contextFactory.CreateContext(); + context.Database.AutoTransactionBehavior = AutoTransactionBehavior.Always; + + var customer1 = new Customer { Id = new string('x', 1_023), PartitionKey = new string('x', 1_023) }; + var customer2 = new Customer { Id = new string('y', 1_023), PartitionKey = new string('x', 1_023) }; + + context.Customers.Add(customer1); + context.Customers.Add(customer2); + + await context.SaveChangesAsync(); + + customer1.Name = new string('x', 1097582); + customer2.Name = new string('x', 1097583); + + if (oneByteOver) + { + customer1.Name += 'x'; + customer2.Name += 'x'; + await Assert.ThrowsAsync(() => context.SaveChangesAsync()); + } + else + { + await context.SaveChangesAsync(); + } + } + + [ConditionalTheory, InlineData(true), InlineData(false)] + public virtual async Task SaveChanges_id_counts_double_toward_request_size_on_update(bool oneByteOver) + { + var contextFactory = await InitializeAsync(); + + using var context = contextFactory.CreateContext(); + context.Database.AutoTransactionBehavior = AutoTransactionBehavior.Always; + + var customer1 = new Customer { Id = new string('x', 1), PartitionKey = new string('x', 1_023) }; + var customer2 = new Customer { Id = new string('y', 1_023), PartitionKey = new string('x', 1_023) }; + + context.Customers.Add(customer1); + context.Customers.Add(customer2); + + await context.SaveChangesAsync(); + + customer1.Name = new string('x', 1097582 + 1_022 * 2 + 1); + customer2.Name = new string('x', 1097583); + + if (oneByteOver) + { + customer1.Name += 'x'; + customer2.Name += 'x'; + await Assert.ThrowsAsync(() => context.SaveChangesAsync()); + } + else + { + await context.SaveChangesAsync(); + } + } + + [ConditionalTheory, InlineData(true), InlineData(false)] + public virtual async Task SaveChanges_transaction_behaviour_always_create_entities_payload_can_be_exactly_cosmos_limit_and_throws_when_1byte_over(bool oneByteOver) + { + var contextFactory = await InitializeAsync(); + + using var context = contextFactory.CreateContext(); + context.Database.AutoTransactionBehavior = AutoTransactionBehavior.Always; + + var customer1 = new Customer { Id = new string('x', 1_023), Name = new string('x', 1098841), PartitionKey = new string('x', 1_023) }; + var customer2 = new Customer { Id = new string('y', 1_023), Name = new string('x', 1098841), PartitionKey = new string('x', 1_023) }; + if (oneByteOver) + { + customer1.Name += 'x'; + customer2.Name += 'x'; + } + + context.Customers.Add(customer1); + context.Customers.Add(customer2); + if (oneByteOver) + { + await Assert.ThrowsAsync(() => context.SaveChangesAsync()); + } + else + { + await context.SaveChangesAsync(); + } + } + + [ConditionalTheory, InlineData(true), InlineData(false)] + public virtual async Task SaveChanges_id_does_not_count_double_toward_request_size_on_create(bool oneByteOver) + { + var contextFactory = await InitializeAsync(); + + using var context = contextFactory.CreateContext(); + context.Database.AutoTransactionBehavior = AutoTransactionBehavior.Always; + + var customer1 = new Customer { Id = new string('x', 1), Name = new string('x', 1098841 + 1_022), PartitionKey = new string('x', 1_023) }; + var customer2 = new Customer { Id = new string('y', 1_023), Name = new string('x', 1098841), PartitionKey = new string('x', 1_023) }; + if (oneByteOver) + { + customer1.Name += 'x'; + customer2.Name += 'x'; + } + + context.Customers.Add(customer1); + context.Customers.Add(customer2); + if (oneByteOver) + { + await Assert.ThrowsAsync(() => context.SaveChangesAsync()); + } + else + { + await context.SaveChangesAsync(); + } + } + + [ConditionalFact] + public async Task SaveChanges_transaction_behaviour_never_does_not_use_transactions() + { + var contextFactory = await InitializeAsync(); + + TransactionalBatchContext CreateContext() + { + var context = contextFactory.CreateContext(); + context.Database.AutoTransactionBehavior = AutoTransactionBehavior.Never; + return context; + } + + var customers = new Customer[] { new Customer { Id = "42", Name = "Theon", PartitionKey = "1" }, new Customer { Id = "43", Name = "Rob", PartitionKey = "1" } }; + + using (var context = CreateContext()) + { + ListLoggerFactory.Clear(); + + context.AddRange(customers); + + await context.SaveChangesAsync(); + + var logEntries = ListLoggerFactory.Log.Where(e => e.Id == CosmosEventId.ExecutedCreateItem).ToList(); + Assert.Equal(2, logEntries.Count); + foreach (var logEntry in logEntries) + { + Assert.Equal(LogLevel.Information, logEntry.Level); + Assert.Contains("CreateItem", logEntry.Message); + } + } + + using (var context = CreateContext()) + { + ListLoggerFactory.Clear(); + var customerFromStore1 = await context.Set().FirstAsync(x => x.Id == "42"); + var customerFromStore2 = await context.Set().LastAsync(x => x.Id == "43"); + + customerFromStore1.Name += " Greyjoy"; + customerFromStore2.Name += " Stark"; + + await context.SaveChangesAsync(); + + var logEntries = ListLoggerFactory.Log.Where(e => e.Id == CosmosEventId.ExecutedReplaceItem).ToList(); + Assert.Equal(2, logEntries.Count); + foreach (var logEntry in logEntries) + { + Assert.Equal(LogLevel.Information, logEntry.Level); + Assert.Contains("ReplaceItem", logEntry.Message); + } + } + + using (var context = CreateContext()) + { + ListLoggerFactory.Clear(); + var customerFromStore1 = await context.Set().FirstAsync(x => x.Id == "42"); + var customerFromStore2 = await context.Set().LastAsync(x => x.Id == "43"); + + Assert.Equal("42", customerFromStore1.Id); + Assert.Equal("Theon Greyjoy", customerFromStore1.Name); + + Assert.Equal("43", customerFromStore2.Id); + Assert.Equal("Rob Stark", customerFromStore2.Name); + + context.Remove(customerFromStore1); + context.Remove(customerFromStore2); + + await context.SaveChangesAsync(); + + var logEntries = ListLoggerFactory.Log.Where(e => e.Id == CosmosEventId.ExecutedDeleteItem).ToList(); + Assert.Equal(2, logEntries.Count); + foreach (var logEntry in logEntries) + { + Assert.Equal(LogLevel.Information, logEntry.Level); + Assert.Contains("DeleteItem", logEntry.Message); + } + } + + using (var context = CreateContext()) + { + ListLoggerFactory.Clear(); + Assert.Empty(await context.Set().ToListAsync()); + } + } + public class TransactionalBatchContext(DbContextOptions options) : PoolableDbContext(options) { public DbSet Customers { get; set; } = null!; diff --git a/test/EFCore.Cosmos.FunctionalTests/EndToEndCosmosTest.cs b/test/EFCore.Cosmos.FunctionalTests/EndToEndCosmosTest.cs index 4aac87e851d..00feda63750 100644 --- a/test/EFCore.Cosmos.FunctionalTests/EndToEndCosmosTest.cs +++ b/test/EFCore.Cosmos.FunctionalTests/EndToEndCosmosTest.cs @@ -1848,9 +1848,9 @@ private TContext CreateContext(ContextFactory factory, bool where TContext : DbContext { var context = factory.CreateContext(); - if (!transactionalBatch) + if (transactionalBatch) { - context.Database.AutoTransactionBehavior = AutoTransactionBehavior.Never; + context.Database.AutoTransactionBehavior = AutoTransactionBehavior.Always; } return context; } diff --git a/test/EFCore.Specification.Tests/BulkUpdates/NorthwindBulkUpdatesTestBase.cs b/test/EFCore.Specification.Tests/BulkUpdates/NorthwindBulkUpdatesTestBase.cs index a4bcb42a346..3df4cc1e7c3 100644 --- a/test/EFCore.Specification.Tests/BulkUpdates/NorthwindBulkUpdatesTestBase.cs +++ b/test/EFCore.Specification.Tests/BulkUpdates/NorthwindBulkUpdatesTestBase.cs @@ -370,7 +370,7 @@ public virtual Task Update_set_constant_TagWith_null(bool async) e => e, s => s.SetProperty(c => c.ContactName, (string)null), rowsAffectedCount: 91, - (b, a) => Assert.All(a, c => Assert.Equal(null, c.ContactName))); + (b, a) => Assert.All(a, c => Assert.Null(c.ContactName))); [ConditionalTheory, MemberData(nameof(IsAsyncData))] public virtual Task Update_Where_set_constant(bool async)