Skip to content

Commit

Permalink
Add additional toMap tests (#218)
Browse files Browse the repository at this point in the history
* Add ToMap tests

* Add missing tests
  • Loading branch information
pivovarit authored Mar 29, 2019
1 parent 2fff3f9 commit 38657c2
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class ToListOrderedParallelismThrottlingBDDTest extends ExecutorAwareTest
private static final long CONSTANT_DELAY = 100;

@Property(trials = TRIALS)
public void shouldCollectToListWithThrottledParallelism(@InRange(minInt = 2, maxInt = 20) int unitsOfWork, @InRange(minInt = 1, maxInt = 40) int parallelism) {
public void shouldCollectToListOrderedWithThrottledParallelism(@InRange(minInt = 2, maxInt = 20) int unitsOfWork, @InRange(minInt = 1, maxInt = 40) int parallelism) {
// given
executor = threadPoolExecutor(unitsOfWork);
long expectedDuration = expectedDuration(parallelism, unitsOfWork, BLOCKING_MILLIS);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.pivovarit.collectors.parallelToMap;

import com.pholser.junit.quickcheck.Property;
import com.pholser.junit.quickcheck.generator.InRange;
import com.pholser.junit.quickcheck.runner.JUnitQuickcheck;
import com.pivovarit.collectors.infrastructure.ExecutorAwareTest;
import org.junit.runner.RunWith;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collector;
import java.util.stream.Stream;

import static com.pivovarit.collectors.ParallelCollectors.parallelToMap;
import static com.pivovarit.collectors.infrastructure.TestUtils.TRIALS;
import static com.pivovarit.collectors.infrastructure.TestUtils.expectedDuration;
import static com.pivovarit.collectors.infrastructure.TestUtils.returnWithDelay;
import static com.pivovarit.collectors.infrastructure.TestUtils.timed;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.data.Offset.offset;

/**
* @author Grzegorz Piwowarek
*/
@RunWith(JUnitQuickcheck.class)
public class ToMapParallelismThrottlingBDDTest extends ExecutorAwareTest {

private static final long BLOCKING_MILLIS = 50;
private static final long CONSTANT_DELAY = 100;

@Property(trials = TRIALS)
public void shouldCollectToMapWithThrottledParallelism(@InRange(minInt = 2, maxInt = 20) int unitsOfWork, @InRange(minInt = 1, maxInt = 40) int parallelism) {
// given
executor = threadPoolExecutor(unitsOfWork);
long expectedDuration = expectedDuration(parallelism, unitsOfWork, BLOCKING_MILLIS);

Map.Entry<Map<Long, Long>, Long> result = timed(collectWith(f -> parallelToMap(f, i -> ThreadLocalRandom.current().nextLong(), executor, parallelism), unitsOfWork));

assertThat(result)
.satisfies(e -> {
assertThat(e.getValue())
.isGreaterThanOrEqualTo(expectedDuration)
.isCloseTo(expectedDuration, offset(CONSTANT_DELAY));

assertThat(e.getKey()).hasSize(unitsOfWork);
});
}

private static <R extends Map<Long, Long>> Supplier<R> collectWith(Function<UnaryOperator<Long>, Collector<Long, ?, CompletableFuture<R>>> collector, int unitsOfWork) {
return () -> Stream.generate(() -> 42L)
.limit(unitsOfWork)
.collect(collector.apply(f -> returnWithDelay(ThreadLocalRandom.current().nextLong(), Duration.ofMillis(BLOCKING_MILLIS))))
.join();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.pivovarit.collectors.parallelToMap;

import com.pivovarit.collectors.infrastructure.TestUtils;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import static com.pivovarit.collectors.ParallelCollectors.parallelToList;
import static com.pivovarit.collectors.ParallelCollectors.parallelToMap;
import static com.pivovarit.collectors.ParallelCollectors.supplier;
import static com.pivovarit.collectors.infrastructure.TestUtils.returnWithDelay;
import static java.time.Duration.ofMillis;
import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Grzegorz Piwowarek
*/
class ToMapParallelismThrottlingTest {

@Test
void shouldParallelizeToMapAndRespectParallelizm() throws InterruptedException {
// given
int parallelism = 2;
TestUtils.CountingExecutor executor = new TestUtils.CountingExecutor();

CompletableFuture<Map<Long, Long>> result =
Stream.generate(() -> 42)
.limit(10)
.collect(parallelToMap(i -> returnWithDelay(42L, ofMillis(Integer.MAX_VALUE)), i -> returnWithDelay(42L, ofMillis(Integer.MAX_VALUE)), executor, parallelism));

assertThat(result)
.isNotCompleted()
.isNotCancelled();

Thread.sleep(50);
assertThat(executor.count()).isEqualTo(parallelism);
}


@Test
void shouldParallelizeToMapAndRespectParallelizmMapping() throws InterruptedException {
// given
int parallelism = 2;
TestUtils.CountingExecutor executor = new TestUtils.CountingExecutor();


CompletableFuture<Map<Long, Long>> result =
Stream.generate(() -> 42)
.limit(10)
.collect(parallelToMap(i -> returnWithDelay(42L, ofMillis(Integer.MAX_VALUE)),i -> returnWithDelay(42L, ofMillis(Integer.MAX_VALUE)) , executor, parallelism));

assertThat(result)
.isNotCompleted()
.isNotCancelled();

Thread.sleep(50);
assertThat(executor.count()).isEqualTo(parallelism);
}
}

0 comments on commit 38657c2

Please sign in to comment.