Skip to content
42 changes: 23 additions & 19 deletions src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ public CosmosClientWrapper(
_enableContentResponseOnWrite = options.EnableContentResponseOnWrite;
}

/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public static Stream Serialize(JToken document)
{
var stream = new MemoryStream();
using var writer = new StreamWriter(stream, new UTF8Encoding(), bufferSize: 1024, leaveOpen: true);

using var jsonWriter = new JsonTextWriter(writer);
CosmosClientWrapper.Serializer.Serialize(jsonWriter, document);
jsonWriter.Flush();
return stream;
}

/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
Expand Down Expand Up @@ -401,14 +418,7 @@ private static async Task<bool> CreateItemOnceAsync(
(string ContainerId, JToken Document, IUpdateEntry Entry, CosmosClientWrapper Wrapper) parameters,
CancellationToken cancellationToken = default)
{
var stream = new MemoryStream();
await using var __ = stream.ConfigureAwait(false);
var writer = new StreamWriter(stream, new UTF8Encoding(), bufferSize: 1024, leaveOpen: false);
await using var ___ = writer.ConfigureAwait(false);

using var jsonWriter = new JsonTextWriter(writer);
Serializer.Serialize(jsonWriter, parameters.Document);
await jsonWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
using var stream = Serialize(parameters.Document);

var entry = parameters.Entry;
var wrapper = parameters.Wrapper;
Expand Down Expand Up @@ -493,13 +503,7 @@ private static async Task<bool> ReplaceItemOnceAsync(
(string ContainerId, string ResourceId, JObject Document, IUpdateEntry Entry, CosmosClientWrapper Wrapper) parameters,
CancellationToken cancellationToken = default)
{
var stream = new MemoryStream();
await using var __ = stream.ConfigureAwait(false);
var writer = new StreamWriter(stream, new UTF8Encoding(), bufferSize: 1024, leaveOpen: false);
await using var ___ = writer.ConfigureAwait(false);
using var jsonWriter = new JsonTextWriter(writer);
Serializer.Serialize(jsonWriter, parameters.Document);
await jsonWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
using var stream = Serialize(parameters.Document);

var entry = parameters.Entry;
var wrapper = parameters.Wrapper;
Expand Down Expand Up @@ -592,12 +596,12 @@ public virtual PartitionKey GetPartitionKeyValue(IUpdateEntry updateEntry)
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public virtual ICosmosTransactionalBatchWrapper CreateTransactionalBatch(string containerId, PartitionKey partitionKeyValue)
public virtual ICosmosTransactionalBatchWrapper CreateTransactionalBatch(string containerId, PartitionKey partitionKeyValue, bool checkSize)
{
var container = Client.GetDatabase(_databaseId).GetContainer(containerId);
var batch = container.CreateTransactionalBatch(partitionKeyValue);

return new CosmosTransactionalBatchWrapper(batch, containerId, partitionKeyValue, _enableContentResponseOnWrite);
return new CosmosTransactionalBatchWrapper(batch, containerId, partitionKeyValue, checkSize, _enableContentResponseOnWrite);
}

/// <summary>
Expand All @@ -606,7 +610,7 @@ public virtual ICosmosTransactionalBatchWrapper CreateTransactionalBatch(string
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public virtual CosmosTransactionalBatchResult ExecuteBatch(ICosmosTransactionalBatchWrapper batch)
public virtual CosmosTransactionalBatchResult ExecuteTransactionalBatch(ICosmosTransactionalBatchWrapper batch)
{
_databaseLogger.SyncNotSupported();

Expand All @@ -623,7 +627,7 @@ private static CosmosTransactionalBatchResult ExecuteBatchOnce(DbContext _,
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public virtual Task<CosmosTransactionalBatchResult> ExecuteBatchAsync(ICosmosTransactionalBatchWrapper batch, CancellationToken cancellationToken = default)
public virtual Task<CosmosTransactionalBatchResult> ExecuteTransactionalBatchAsync(ICosmosTransactionalBatchWrapper batch, CancellationToken cancellationToken = default)
=> _executionStrategy.ExecuteAsync((batch, this), ExecuteBatchOnceAsync, null, cancellationToken);

private static async Task<CosmosTransactionalBatchResult> ExecuteBatchOnceAsync(DbContext _,
Expand Down
Loading