diff --git a/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/Chunk.cs b/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/Chunk.cs new file mode 100644 index 0000000000..837e2ae4bf --- /dev/null +++ b/Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/Chunk.cs @@ -0,0 +1,88 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT License. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace Tests +{ + public class Chunk : AsyncEnumerableTests + { + [Fact] + public async Task Chunk_Null() + { + await Assert.ThrowsAsync(async () => + { + IAsyncEnumerable? xs = null; + var ys = xs!.ChunkAsync(24); + + var e = ys.GetAsyncEnumerator(); + await e.MoveNextAsync(); + }); + } + + [Theory] + [InlineData(0)] + [InlineData(-1)] + public async Task Chunk_NonPositiveSize(int size) + { + await Assert.ThrowsAsync("size", async () => + { + var xs = new[] { 24 }.ToAsyncEnumerable(); + var ys = xs.ChunkAsync(size); + + var e = ys.GetAsyncEnumerator(); + await e.MoveNextAsync(); + }); + } + + [Fact] + public async Task Chunk_Simple_Evenly() + { + var xs = new[] { 1, 1, 4, 5, 1, 4 }.ToAsyncEnumerable(); + var ys = xs.ChunkAsync(3); + + var e = ys.GetAsyncEnumerator(); + await HasNextAsync(e, new[] { 1, 1, 4 }); + await HasNextAsync(e, new[] { 5, 1, 4 }); + await NoNextAsync(e); + } + + [Fact] + public async Task Chunk_Simple_Unevenly() + { + var xs = new[] { 1, 9, 1, 9, 8, 1, 0 }.ToAsyncEnumerable(); + var ys = xs.ChunkAsync(4); + + var e = ys.GetAsyncEnumerator(); + await HasNextAsync(e, new[] { 1, 9, 1, 9 }); + await HasNextAsync(e, new[] { 8, 1, 0 }); + await NoNextAsync(e); + } + + [Fact] + public async Task Chunk_SourceSmallerThanChunkSize() + { + var xs = new[] { 8, 9, 3 }.ToAsyncEnumerable(); + var ys = xs.ChunkAsync(4); + + var e = ys.GetAsyncEnumerator(); + await HasNextAsync(e, new[] { 8, 9, 3 }); + await NoNextAsync(e); + } + + [Fact] + public async Task Chunk_EmptySource() + { + var xs = new int[0].ToAsyncEnumerable(); + var ys = xs.ChunkAsync(24); + + var e = ys.GetAsyncEnumerator(); + await NoNextAsync(e); + } + } +} diff --git a/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Chunk.cs b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Chunk.cs new file mode 100644 index 0000000000..f3c83dbd54 --- /dev/null +++ b/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Chunk.cs @@ -0,0 +1,104 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT License. +// See the LICENSE file in the project root for more information. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Linq +{ + public static partial class AsyncEnumerable + { + /// + /// Split the elements of an async-enumerable sequence into chunks of size at most . + /// + /// + /// Every chunk except the last will be of size . + /// The last chunk will contain the remaining elements and may be of a smaller size. + /// + /// + /// An whose elements to chunk. + /// + /// + /// Maximum size of each chunk. + /// + /// The optional cancellation token to be used for cancelling the sequence at any time. + /// + /// The type of the elements of source. + /// + /// + /// An that contains the elements the input sequence split into chunks of size . + /// + /// + /// is null. + /// + /// + /// is below 1. + /// + public static async IAsyncEnumerable ChunkAsync(this IAsyncEnumerable source, int size, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + if (source == null) + throw Error.ArgumentNull(nameof(source)); + + if (size < 1) + throw Error.ArgumentOutOfRange(nameof(size)); + + await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false); + + // Before allocating anything, make sure there's at least one element. + if (await e.MoveNextAsync()) + { + // Now that we know we have at least one item, allocate an initial storage array. This is not + // the array we'll yield. It starts out small in order to avoid significantly overallocating + // when the source has many fewer elements than the chunk size. + var arraySize = Math.Min(size, 4); + int i; + do + { + var array = new TSource[arraySize]; + + // Store the first item. + array[0] = e.Current; + i = 1; + + if (size != array.Length) + { + // This is the first chunk. As we fill the array, grow it as needed. + for (; i < size && await e.MoveNextAsync(); i++) + { + if (i >= array.Length) + { + arraySize = (int)Math.Min((uint)size, 2 * (uint)array.Length); + Array.Resize(ref array, arraySize); + } + + array[i] = e.Current; + } + } + else + { + // For all but the first chunk, the array will already be correctly sized. + // We can just store into it until either it's full or MoveNext returns false. + var local = array; // avoid bounds checks by using cached local (`array` is lifted to iterator object as a field) + Debug.Assert(local.Length == size); + for (; (uint)i < (uint)local.Length && await e.MoveNextAsync(); i++) + { + local[i] = e.Current; + } + } + + if (i != array.Length) + { + Array.Resize(ref array, i); + } + + yield return array; + } + while (i >= size && await e.MoveNextAsync()); + } + } + } +}