diff --git a/src/test/java/com/pivovarit/collectors/parallelToListOrdered/ToListOrderedParallelismThrottlingBDDTest.java b/src/test/java/com/pivovarit/collectors/parallelToListOrdered/ToListOrderedParallelismThrottlingBDDTest.java index fde34010..60305e75 100644 --- a/src/test/java/com/pivovarit/collectors/parallelToListOrdered/ToListOrderedParallelismThrottlingBDDTest.java +++ b/src/test/java/com/pivovarit/collectors/parallelToListOrdered/ToListOrderedParallelismThrottlingBDDTest.java @@ -36,7 +36,7 @@ public class ToListOrderedParallelismThrottlingBDDTest extends ExecutorAwareTest private static final long CONSTANT_DELAY = 100; @Property(trials = TRIALS) - public void shouldCollectToListWithThrottledParallelism(@InRange(minInt = 2, maxInt = 20) int unitsOfWork, @InRange(minInt = 1, maxInt = 40) int parallelism) { + public void shouldCollectToListOrderedWithThrottledParallelism(@InRange(minInt = 2, maxInt = 20) int unitsOfWork, @InRange(minInt = 1, maxInt = 40) int parallelism) { // given executor = threadPoolExecutor(unitsOfWork); long expectedDuration = expectedDuration(parallelism, unitsOfWork, BLOCKING_MILLIS); diff --git a/src/test/java/com/pivovarit/collectors/parallelToMap/ToMapParallelismThrottlingBDDTest.java b/src/test/java/com/pivovarit/collectors/parallelToMap/ToMapParallelismThrottlingBDDTest.java new file mode 100644 index 00000000..01496582 --- /dev/null +++ b/src/test/java/com/pivovarit/collectors/parallelToMap/ToMapParallelismThrottlingBDDTest.java @@ -0,0 +1,60 @@ +package com.pivovarit.collectors.parallelToMap; + +import com.pholser.junit.quickcheck.Property; +import com.pholser.junit.quickcheck.generator.InRange; +import com.pholser.junit.quickcheck.runner.JUnitQuickcheck; +import com.pivovarit.collectors.infrastructure.ExecutorAwareTest; +import org.junit.runner.RunWith; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; +import java.util.stream.Collector; +import java.util.stream.Stream; + +import static com.pivovarit.collectors.ParallelCollectors.parallelToMap; +import static com.pivovarit.collectors.infrastructure.TestUtils.TRIALS; +import static com.pivovarit.collectors.infrastructure.TestUtils.expectedDuration; +import static com.pivovarit.collectors.infrastructure.TestUtils.returnWithDelay; +import static com.pivovarit.collectors.infrastructure.TestUtils.timed; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.data.Offset.offset; + +/** + * @author Grzegorz Piwowarek + */ +@RunWith(JUnitQuickcheck.class) +public class ToMapParallelismThrottlingBDDTest extends ExecutorAwareTest { + + private static final long BLOCKING_MILLIS = 50; + private static final long CONSTANT_DELAY = 100; + + @Property(trials = TRIALS) + public void shouldCollectToMapWithThrottledParallelism(@InRange(minInt = 2, maxInt = 20) int unitsOfWork, @InRange(minInt = 1, maxInt = 40) int parallelism) { + // given + executor = threadPoolExecutor(unitsOfWork); + long expectedDuration = expectedDuration(parallelism, unitsOfWork, BLOCKING_MILLIS); + + Map.Entry, Long> result = timed(collectWith(f -> parallelToMap(f, i -> ThreadLocalRandom.current().nextLong(), executor, parallelism), unitsOfWork)); + + assertThat(result) + .satisfies(e -> { + assertThat(e.getValue()) + .isGreaterThanOrEqualTo(expectedDuration) + .isCloseTo(expectedDuration, offset(CONSTANT_DELAY)); + + assertThat(e.getKey()).hasSize(unitsOfWork); + }); + } + + private static > Supplier collectWith(Function, Collector>> collector, int unitsOfWork) { + return () -> Stream.generate(() -> 42L) + .limit(unitsOfWork) + .collect(collector.apply(f -> returnWithDelay(ThreadLocalRandom.current().nextLong(), Duration.ofMillis(BLOCKING_MILLIS)))) + .join(); + } +} diff --git a/src/test/java/com/pivovarit/collectors/parallelToMap/ToMapParallelismThrottlingTest.java b/src/test/java/com/pivovarit/collectors/parallelToMap/ToMapParallelismThrottlingTest.java new file mode 100644 index 00000000..fa8b6581 --- /dev/null +++ b/src/test/java/com/pivovarit/collectors/parallelToMap/ToMapParallelismThrottlingTest.java @@ -0,0 +1,62 @@ +package com.pivovarit.collectors.parallelToMap; + +import com.pivovarit.collectors.infrastructure.TestUtils; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; + +import static com.pivovarit.collectors.ParallelCollectors.parallelToList; +import static com.pivovarit.collectors.ParallelCollectors.parallelToMap; +import static com.pivovarit.collectors.ParallelCollectors.supplier; +import static com.pivovarit.collectors.infrastructure.TestUtils.returnWithDelay; +import static java.time.Duration.ofMillis; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Grzegorz Piwowarek + */ +class ToMapParallelismThrottlingTest { + + @Test + void shouldParallelizeToMapAndRespectParallelizm() throws InterruptedException { + // given + int parallelism = 2; + TestUtils.CountingExecutor executor = new TestUtils.CountingExecutor(); + + CompletableFuture> result = + Stream.generate(() -> 42) + .limit(10) + .collect(parallelToMap(i -> returnWithDelay(42L, ofMillis(Integer.MAX_VALUE)), i -> returnWithDelay(42L, ofMillis(Integer.MAX_VALUE)), executor, parallelism)); + + assertThat(result) + .isNotCompleted() + .isNotCancelled(); + + Thread.sleep(50); + assertThat(executor.count()).isEqualTo(parallelism); + } + + + @Test + void shouldParallelizeToMapAndRespectParallelizmMapping() throws InterruptedException { + // given + int parallelism = 2; + TestUtils.CountingExecutor executor = new TestUtils.CountingExecutor(); + + + CompletableFuture> result = + Stream.generate(() -> 42) + .limit(10) + .collect(parallelToMap(i -> returnWithDelay(42L, ofMillis(Integer.MAX_VALUE)),i -> returnWithDelay(42L, ofMillis(Integer.MAX_VALUE)) , executor, parallelism)); + + assertThat(result) + .isNotCompleted() + .isNotCancelled(); + + Thread.sleep(50); + assertThat(executor.count()).isEqualTo(parallelism); + } +}