From f8a17972f871489ee774994b2878f2463c7894f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AD=8F=E6=9D=B0?= <16899918+JasonWei512@users.noreply.github.com> Date: Mon, 9 May 2022 11:21:41 +0800 Subject: [PATCH 1/2] Add a "ChunkAsync" extension method for IAsyncEnumerable --- .../System/Linq/Operators/Chunk.cs | 88 +++++++++++++++++++ .../System/Linq/Operators/Chunk.cs | 73 +++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/Chunk.cs create mode 100644 Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Chunk.cs diff --git a/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/Chunk.cs b/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/Chunk.cs new file mode 100644 index 0000000000..837e2ae4bf --- /dev/null +++ b/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/Chunk.cs @@ -0,0 +1,88 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT License. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace Tests +{ + public class Chunk : AsyncEnumerableTests + { + [Fact] + public async Task Chunk_Null() + { + await Assert.ThrowsAsync(async () => + { + IAsyncEnumerable? xs = null; + var ys = xs!.ChunkAsync(24); + + var e = ys.GetAsyncEnumerator(); + await e.MoveNextAsync(); + }); + } + + [Theory] + [InlineData(0)] + [InlineData(-1)] + public async Task Chunk_NonPositiveSize(int size) + { + await Assert.ThrowsAsync("size", async () => + { + var xs = new[] { 24 }.ToAsyncEnumerable(); + var ys = xs.ChunkAsync(size); + + var e = ys.GetAsyncEnumerator(); + await e.MoveNextAsync(); + }); + } + + [Fact] + public async Task Chunk_Simple_Evenly() + { + var xs = new[] { 1, 1, 4, 5, 1, 4 }.ToAsyncEnumerable(); + var ys = xs.ChunkAsync(3); + + var e = ys.GetAsyncEnumerator(); + await HasNextAsync(e, new[] { 1, 1, 4 }); + await HasNextAsync(e, new[] { 5, 1, 4 }); + await NoNextAsync(e); + } + + [Fact] + public async Task Chunk_Simple_Unevenly() + { + var xs = new[] { 1, 9, 1, 9, 8, 1, 0 }.ToAsyncEnumerable(); + var ys = xs.ChunkAsync(4); + + var e = ys.GetAsyncEnumerator(); + await HasNextAsync(e, new[] { 1, 9, 1, 9 }); + await HasNextAsync(e, new[] { 8, 1, 0 }); + await NoNextAsync(e); + } + + [Fact] + public async Task Chunk_SourceSmallerThanChunkSize() + { + var xs = new[] { 8, 9, 3 }.ToAsyncEnumerable(); + var ys = xs.ChunkAsync(4); + + var e = ys.GetAsyncEnumerator(); + await HasNextAsync(e, new[] { 8, 9, 3 }); + await NoNextAsync(e); + } + + [Fact] + public async Task Chunk_EmptySource() + { + var xs = new int[0].ToAsyncEnumerable(); + var ys = xs.ChunkAsync(24); + + var e = ys.GetAsyncEnumerator(); + await NoNextAsync(e); + } + } +} diff --git a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Chunk.cs b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Chunk.cs new file mode 100644 index 0000000000..555e44dc7b --- /dev/null +++ b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Chunk.cs @@ -0,0 +1,73 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT License. +// See the LICENSE file in the project root for more information. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Linq +{ + public static partial class AsyncEnumerable + { + /// + /// Split the elements of an async-enumerable sequence into chunks of size at most . + /// + /// + /// Every chunk except the last will be of size . + /// The last chunk will contain the remaining elements and may be of a smaller size. + /// + /// + /// An whose elements to chunk. + /// + /// + /// Maximum size of each chunk. + /// + /// The optional cancellation token to be used for cancelling the sequence at any time. + /// + /// The type of the elements of source. + /// + /// + /// An that contains the elements the input sequence split into chunks of size . + /// + /// + /// is null. + /// + /// + /// is below 1. + /// + public static async IAsyncEnumerable ChunkAsync(this IAsyncEnumerable source, int size, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + if (source == null) + throw Error.ArgumentNull(nameof(source)); + + if (size < 1) + throw Error.ArgumentOutOfRange(nameof(size)); + + await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false); + + if (await e.MoveNextAsync()) + { + var chunkBuilder = new List(); + while (true) + { + do + { + chunkBuilder.Add(e.Current); + } + while (chunkBuilder.Count < size && await e.MoveNextAsync()); + + yield return chunkBuilder.ToArray(); + + if (chunkBuilder.Count < size || !await e.MoveNextAsync()) + { + yield break; + } + chunkBuilder.Clear(); + } + } + } + } +} From e2f29d576cd798b5cd2d657c7630c71b1e01be40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AD=8F=E6=9D=B0?= <16899918+JasonWei512@users.noreply.github.com> Date: Thu, 25 Aug 2022 14:04:29 +0800 Subject: [PATCH 2/2] Optimize AsyncEnumerable.ChunkAsync Keep up-to-date with https://github.com/dotnet/runtime/blob/8ec62773f573f0915f2b6e1eea306fd4a84f4eb4/src/libraries/System.Linq/src/System/Linq/Chunk.cs --- .../System/Linq/Operators/Chunk.cs | 51 +++++++++++++++---- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Chunk.cs b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Chunk.cs index 555e44dc7b..f3c83dbd54 100644 --- a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Chunk.cs +++ b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Chunk.cs @@ -48,25 +48,56 @@ public static async IAsyncEnumerable ChunkAsync(this IAsyncE await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false); + // Before allocating anything, make sure there's at least one element. if (await e.MoveNextAsync()) { - var chunkBuilder = new List(); - while (true) + // Now that we know we have at least one item, allocate an initial storage array. This is not + // the array we'll yield. It starts out small in order to avoid significantly overallocating + // when the source has many fewer elements than the chunk size. + var arraySize = Math.Min(size, 4); + int i; + do { - do + var array = new TSource[arraySize]; + + // Store the first item. + array[0] = e.Current; + i = 1; + + if (size != array.Length) { - chunkBuilder.Add(e.Current); - } - while (chunkBuilder.Count < size && await e.MoveNextAsync()); + // This is the first chunk. As we fill the array, grow it as needed. + for (; i < size && await e.MoveNextAsync(); i++) + { + if (i >= array.Length) + { + arraySize = (int)Math.Min((uint)size, 2 * (uint)array.Length); + Array.Resize(ref array, arraySize); + } - yield return chunkBuilder.ToArray(); + array[i] = e.Current; + } + } + else + { + // For all but the first chunk, the array will already be correctly sized. + // We can just store into it until either it's full or MoveNext returns false. + var local = array; // avoid bounds checks by using cached local (`array` is lifted to iterator object as a field) + Debug.Assert(local.Length == size); + for (; (uint)i < (uint)local.Length && await e.MoveNextAsync(); i++) + { + local[i] = e.Current; + } + } - if (chunkBuilder.Count < size || !await e.MoveNextAsync()) + if (i != array.Length) { - yield break; + Array.Resize(ref array, i); } - chunkBuilder.Clear(); + + yield return array; } + while (i >= size && await e.MoveNextAsync()); } } }