Skip to content

IAsyncEnumerable on DelayedQuery #364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions Orm/Xtensive.Orm.Manual/Prefetch/PrefetchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
using NUnit.Framework;
using System.Linq;
using System.Threading.Tasks;
using Xtensive.Core;
using Xtensive.Orm.Internals.Prefetch;
using Xtensive.Orm.Services;
using Xtensive.Orm.Tests;

Expand Down Expand Up @@ -44,7 +42,7 @@ public class Person : Entity

public Person(Session session)
: base(session)
{}
{ }
}

#endregion
Expand All @@ -63,10 +61,10 @@ protected override Configuration.DomainConfiguration BuildConfiguration()
[TearDown]
public void ClearContent()
{
using(var session = Domain.OpenSession())
using(var tx = session.OpenTransaction()) {
using (var session = Domain.OpenSession())
using (var tx = session.OpenTransaction()) {
var people = session.Query.All<Person>().ToList();
foreach(var person in people) {
foreach (var person in people) {
person.Manager = null;
}
session.SaveChanges();
Expand Down Expand Up @@ -247,7 +245,7 @@ public async Task MultipleBatchesAsyncTest()
var count = 1000;

await using (var session = await Domain.OpenSessionAsync())
using (var transactionScope = session.OpenTransaction()){
using (var transactionScope = session.OpenTransaction()) {
var people = new Person[count];
for (var i = 0; i < count; i++) {
people[i] = new Person(session) { Name = i.ToString(), Photo = new[] { (byte) (i % 256) } };
Expand Down
41 changes: 40 additions & 1 deletion Orm/Xtensive.Orm.Tests/Storage/Prefetch/PrefetchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
using Xtensive.Orm.Configuration;
using Xtensive.Orm.Internals;
using Xtensive.Orm.Internals.Prefetch;
using Xtensive.Orm.Model;
using Xtensive.Orm.Services;
using Xtensive.Orm.Tests.ObjectModel;
using Xtensive.Orm.Tests.ObjectModel.ChinookDO;
using FieldInfo = Xtensive.Orm.Model.FieldInfo;

namespace Xtensive.Orm.Tests.Storage.Prefetch
{
Expand Down Expand Up @@ -923,6 +924,44 @@ public async Task StructurePrefetchAsyncTest()
}
}

[Test]
public void PrefetchOnDelayedQueryTest()
{
using var session = Domain.OpenSession();
var sessionAccessor = session.Services.Get<DirectSessionAccessor>();
using var moq = new QueryCounterSessionHandlerMock(session.Handler);
using (sessionAccessor.ChangeSessionHandler(moq))
using (var transactionScope = session.OpenTransaction()) {
var prefetcher = session.Query.CreateDelayedQuery(q => q.All<Invoice>())
.Prefetch(o => o.ProcessingTime);
foreach (var invoice in prefetcher.AsEnumerable()) {
// some code here...
}
transactionScope.Complete();
}
Assert.That(moq.GetSyncCounter(), Is.GreaterThan(0));
Assert.That(moq.GetAsyncCounter(), Is.EqualTo(0));
}

[Test]
public async Task PrefetchOnDelayedQueryAsyncTest()
{
await using var session = await Domain.OpenSessionAsync();
var sessionAccessor = session.Services.Get<DirectSessionAccessor>();
await using var moq = new QueryCounterSessionHandlerMock(session.Handler);
using (sessionAccessor.ChangeSessionHandler(moq))
await using (var transactionScope = await session.OpenTransactionAsync()) {
var prefetcher = session.Query.CreateDelayedQuery(q => q.All<Invoice>())
.Prefetch(o => o.ProcessingTime);
await foreach (var invoice in prefetcher.AsAsyncEnumerable()) {
// some code here...
}
transactionScope.Complete();
}
Assert.That(moq.GetSyncCounter(), Is.EqualTo(0));
Assert.That(moq.GetAsyncCounter(), Is.GreaterThan(0));
}

private void RemoveAllBooks()
{
using (var session = Domain.OpenSession())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Xtensive.Orm.Internals;
using Xtensive.Orm.Providers;

namespace Xtensive.Orm.Tests.Storage.Prefetch
{
internal class QueryCounterSessionHandlerMock : ChainingSessionHandler
{
private volatile int syncCounter;
private volatile int asyncCounter;

public int GetSyncCounter() => syncCounter;

public int GetAsyncCounter() => asyncCounter;

public QueryCounterSessionHandlerMock(SessionHandler chainedHandler) : base(chainedHandler)
{
}

public override void ExecuteQueryTasks(IEnumerable<QueryTask> queryTasks, bool allowPartialExecution)
{
_ = Interlocked.Increment(ref syncCounter);
base.ExecuteQueryTasks(queryTasks, allowPartialExecution);
}

public override Task ExecuteQueryTasksAsync(IEnumerable<QueryTask> queryTasks, bool allowPartialExecution, CancellationToken token)
{
_ = Interlocked.Increment(ref asyncCounter);
return base.ExecuteQueryTasksAsync(queryTasks, allowPartialExecution, token);
}
}
}
14 changes: 12 additions & 2 deletions Orm/Xtensive.Orm/Orm/DelayedQuery{T}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,23 @@ namespace Xtensive.Orm
/// </summary>
/// <typeparam name="TElement">The type of the element in a resulting sequence.</typeparam>
[Serializable]
public sealed class DelayedQuery<TElement> : DelayedQuery, IEnumerable<TElement>
public sealed class DelayedQuery<TElement> : DelayedQuery, IEnumerable<TElement>, IAsyncEnumerable<TElement>
{
/// <inheritdoc/>
public IEnumerator<TElement> GetEnumerator() => Materialize<TElement>().GetEnumerator();

/// <inheritdoc/>
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

/// <inheritdoc/>
async IAsyncEnumerator<TElement> IAsyncEnumerable<TElement>.GetAsyncEnumerator(CancellationToken token)
{
var elements = await ExecuteAsync(token).ConfigureAwait(false);
foreach (var element in elements) {
yield return element;
}
}

/// <summary>
/// Asynchronously executes delayed query.
/// </summary>
Expand All @@ -43,6 +52,7 @@ public ValueTask<QueryResult<TElement>> ExecuteAsync(CancellationToken token = d

internal DelayedQuery(Session session, TranslatedQuery translatedQuery, ParameterContext parameterContext)
: base(session, translatedQuery, parameterContext)
{}
{ }

}
}