v8.20 of cyclops: Lazy extended collections, Streaming utilities and more for Project Reactor
8.2.0 Release of Cyclops
Integration module enhancements
Reactor
- Lazy extended Collections
- Lazy persistent extended Collections
- Streaming Utilities - Pipes and FluxSource for 'pushable' Streams
- For comprehension syntax / API enhancement
- Native Monad Transformers for Monos
- Native for comprehensions for MonoT types
- Version bump to Reactor 3!
RxJava
- For comprehension syntax / API enhancement
Lazy extended Collections
Standard JDK collections
- LazyListX
- LazyDequeX
- LazyQueueX
- LazySetX
- LazySortedX
Persistent collections
- LazyPStackX (A persistent LinkedList)
- LazyPVectorX (A persistent Vector - an ArrayList analogue)
- LazyPQueueX (A persistent Queue)
- LazyPSetX (A persistent Set)
- LazyPOrderedSetX (A persistent OrderedSet)
- LazyPBagX (A persistent Bag)
Notes :
- Lazy collections can not contain nulls (extended operations will result in NullPointerException), use ListX from cyclops-react for an extended List that can contain nulls
- Data access / modifications operations are eager (transformations are lazy)
- A Lazy Collection is not a Stream, eager operations result in the materialization of the entire list (there is no short circuiting, for example)
LazyListX
LazyListX extends ListX from cyclops-react (and JDK java.util.List).
ListX<Integer> lazy = LazyListX.fromIterable(myIterable);
//lazily define operations
ListX<ListX<Integer>> transformed = lazy.map(i->i*2)
.filter(i->i<100)
.grouped(2);
//operations performed when data is accessed
transformed.get(0).reduce(0,(a,b)->a+b);
Notes : (repeated for LazyListX only - holds for all)
- LazyListX can not contain nulls (extended operations will result in NullPointerException), use ListX from cyclops-react for an extended List that can contain nulls
- Data access / modifications operations are eager (transformations are lazy)
- A LazyList is not a Stream, eager operations result in the materialization of the entire list (there is no short circuiting, for example)
LazyDequeX
LazyDequeX extends DequeX from cyclops-react (and JDK java.util.Deque).
DequeX<Integer> lazy = LazyDequeX.fromIterable(myIterable);
//lazily define operations
DequeX<ListX<Integer>> transformed = lazy.map(i->i*2)
.filter(i->i<100)
.grouped(2);
//operations performed when data is accessed
transformed.get(0).reduce(0,(a,b)->a+b);
LazyQueueX
LazyQueueX extends QueueX from cyclops-react (and JDK java.util.Deque).
QueueX<Integer> lazy = LazyQueueX.fromIterable(myIterable);
//lazily define operations
LazyQueueX<ListX<Integer>> transformed = lazy.map(i->i*2)
.filter(i->i<100)
.sliding(2,1);
//operations performed when data is accessed
transformed.get(0).reduce(0,(a,b)->a+b);
FluxSource
For pushing data into Flux and Mono types
PushableFlux<Integer> pushable = FluxSource.ofUnbounded();
pushable.getQueue()
.offer(1);
//on a separate thread
pushable.getFlux()
.map(i->i*2)
.subscribe(System.out::println);
//then push data into your Flux
pushable.getQueue()
.offer(2);
//close the transfer Queue
pushable.getQueue()
.close();
Documentation for StreamSource (cyclops-react / extended JDK analogue of FluxSource)
Blog post on pushing data into Java 8 Streams
Documentation for working with Queues
Joining Streams with ReactorPipes
ReactorPipes provides an API for flexible joining of multple different Stream types.
ReactorPipes<String,Integer> pipes = ReactorPipes.of();
//store a transfer Queue with a max size of 1,000 entries
pipes.register("transfer1",QueueFactories.boundedQueue(1_000));
//connect a Flux to transfer1
Maybe<Flux<Integer>> connected = pipes.flux("transfer1");
Flux<Integer> stream = connected.get();
//Setup a producing Stream
ReactiveSeq seq = ReactiveSeq.generate(this::loadData)
.map(this::processData);
pipes.publishToAsync("transfer1",seq);
stream.map(e->handleNextElement(e))
.subscribe(this::save);
Example for comprehensions with Flux
import static com.aol.cyclops.reactor.Fluxes.forEach;
Flux<Integer> result = forEach(Flux.just(10,20),a->Flux.<Integer>just(a+10)
,(a,b)->a+b);
//Flux[30,50]
Example for comprehensions with Mono
import static com.aol.cyclops.reactor.Monos.forEach;
Mono<Integer> result = forEach(Mono.just(10),a->Mono.<Integer>just(a+10)
,(a,b)->a+b);
//Mono[30]
FluxT monad transformer
import static com.aol.cyclops.reactor.FluxTs.fluxT;
FluxTSeq<Integer> nested = fluxT(Flux.just(Flux.just(1,2,3),Flux.just(10,20,30)));
FluxTSeq<Integer> mapped = nested.map(i->i*3);
//mapped = [Flux[Flux[3,6,9],Flux30,60,90]]
MonoT monad transformer
import static com.aol.cyclops.reactor.MonoTs.monoT;
MonoTSeq<Integer> nestedFuture = monoT(Flux.just(Mono.just(1),Mono.just(10)));
mapped = nested.map(i->i*3);
//mapped = [Flux[Mono[3],Mono[30]]
Getting Cyclops 8.2.0
MODULE_NAMES : cyclops-rx, cyclops-reactor, cyclops-guava, cyclops-functionaljava, cyclops-javaslang
Gradle
compile 'com.aol.cyclops:MODULE_NAME:8.2.0’
Maven
<dependency>
<groupId>com.aol.cyclops</groupId>
<artifactId>MODULE_NAME</artifactId>
<version>8.2.0</version>
</dependency>
Javadoc
http://www.javadoc.io/doc/com.aol.cyclops/cyclops-rx/8.2.0
http://www.javadoc.io/doc/com.aol.cyclops/cyclops-reactor/8.2.0
http://www.javadoc.io/doc/com.aol.cyclops/cyclops-javaslang/8.2.0
http://www.javadoc.io/doc/com.aol.cyclops/cyclops-functionaljava/8.2.0
http://www.javadoc.io/doc/com.aol.cyclops/cyclops-guava/8.2.0