Distributed Streams

Now that Infinispan supports Java 8, we can take full advantage of some of the new features.  One of the big features of Java 8 is the new Stream classes.  This flips the head on processing data so that instead of having to iterate upon the data yourself the underlying Stream handles that and you just provide the operations to perform on it.  This lends itself great to distributed processing as the iteration is handled entirely by the implementation (in this case Infinispan).

I therefore am glad to introduce for Infinispan 8, the feature Distributed Streams!  This allows for any operation you can perform on a regular Stream to also be performed on a Distributed cache (assuming the operation and data is marshallable).


When using a distributed or replicated cache, the keys and values of the cache must be marshallable.  This is the same case for intermediate and terminal operations when using the distributed streams.  Normally you would have to provide an instance of some new class that is either Serializable or has an Externalizer registered for it as described in the marshallable section of the user guide.

However, Java 8 also introduced lambdas, which can be defined as serializable very easily (although it is a bit awkward).  An example of this serialization can be found here.

Some of you may also be aware of the Collectors class which is used with the collect method on a stream.  Unfortunately, all of the Collectors produced are not able to be marshalled.  As such, Infinispan has added a utility class that can work in conjunction with the Collectors class.  This allows you to still use any combination of the Collectors classes and still work properly when everything is required to be marshalled.


Java 8 streams naturally have a sense of parallelism.  That is that the stream can be marked as being parallel.  This in turn allows for the operations to be performed in parallel using multiple threads.  The best part is how simple it is to do.  The stream can be made parallel when first retrieving it by invoking parallelStream or you can optionally enable it after the Stream is retrieved by just invoking parallel.

The new Distributed streams from Infinispan take this one step further, which I am calling parallel distribution.  That is that since data is already partitioned across nodes we can also allow operations to be ran simultaneously on different nodes at the same time.  This option is enabled by default.  However this can be controlled by using the new CacheStream interface discussed just below.  Also, to be clear, the Java 8 parallel can be used in conjunction with parallel distribution.  This just means you will have concurrent operations running on multiple nodes across multiple threads on each node.

CacheStream interface

There is a new interface Cachestream provided that allows for controlling additional options when using a Distributed Stream.  I am highlighting the added methods (note comments have been removed from gist)


This method controls how many elements are brought back at one time for operations that are key aware.  These operations are (spl)iterator and forEach.  This is useful to tweak how many keys are held in memory from a remote node.  Thus it is a tradeoff of performance (more keys) versus memory.  This defaults to the chunk size as configured by state transfer.

parallelDistribution / sequentialDistribution

This was discussed in the parallelism section above.  Note that all commands have this enabled by default except for spl(iterator) methods.


This method can be used to have the distributed stream only operate on a given set of keys.  This is done in a very efficient way as it will only perform the operation on node(s) that own the given keys.  Using a given set of keys also allows for constant access time from the data container/store as the cache doesn’t have to look at every single entry in the cache.

filterKeySegments (advanced users only)

This is useful to do filtering of instances in a more performant way.  Normally, you could use the filter intermediate operation, but this method is performed before any of the operations are performed to most efficiently limit the entries that are presented for stream processing.  For example, if only a subset of segments are required, it may not have to send a remote request.

segmentCompletionListener (advanced users only)

Similar to the previous method, this is related to key segments.  This listener allows for the end user to be notified when a segment has been completed for processing.  This can be useful if you want to keep track of completion and if this node goes down, you can rerun the processing with only the unprocessed segments.  Currently, this listener is only supported for spl(iterator) methods.

disableRehashAware (advanced users only)

By default, all stream operations are what is called rehash aware.  That is if a node joins or leaves the cluster while the operation is in progress the cluster will be aware of this and ensure that all data is processed properly with no loss (assuming no data was actually lost).

This can be disabled by calling disableRehashAware; however, if a rehash is to occur in the middle of the operation, it is possible that all data may not be processed.  It should be noted that data is not processed multiple times with this disabled, only a loss of data can occur.

This option is not normally recommended unless you have a situation where you can afford to only operate on a subset of data.  The tradeoff is that the operation can perform faster, especially (spl)iterator and forEach methods.


The age old example of map/reduce is always word count.  Streams allow you to do that as well!  Here is an equivalent word count example assuming you have a Cache containing String keys and values and you want the count of all words in the values.  Some of you may be wondering how this  relates to our existing map/reduce framework.  The plan is to deprecate the existing Map/Reduce and replace it completely with the new distributed streams at a later point.

Remember though that distributed streams can do so much more than just map/reduce. And there are a lot of examples already out there for streams. To use the distributed streams, you just need to make sure your operations are marshallable, and you are good to go.

Here are a few pages with examples of how to use streams straight from Oracle:

I hope you enjoy Distributed Streams.  We hope they change how you interact with your data in the cluster!

Let us know what you think, any issues or usages you would love to share!





JUGs alpha as7 asymmetric clusters asynchronous beta c++ cdi chat clustering community conference configuration console data grids data-as-a-service database devoxx distributed executors docker event functional grouping and aggregation hotrod infinispan java 8 jboss cache jcache jclouds jcp jdg jpa judcon kubernetes listeners meetup minor release off-heap openshift performance presentations product protostream radargun radegast recruit release release 8.2 9.0 final release candidate remote query replication queue rest query security spring streams transactions vert.x workshop 8.1.0 API DSL Hibernate-Search Ickle Infinispan Query JP-QL JSON JUGs JavaOne LGPL License NoSQL Open Source Protobuf SCM administration affinity algorithms alpha amazon anchored keys annotations announcement archetype archetypes as5 as7 asl2 asynchronous atomic maps atomic objects availability aws beer benchmark benchmarks berkeleydb beta beta release blogger book breizh camp buddy replication bugfix c# c++ c3p0 cache benchmark framework cache store cache stores cachestore cassandra cdi cep certification cli cloud storage clustered cache configuration clustered counters clustered locks codemotion codename colocation command line interface community comparison compose concurrency conference conferences configuration console counter cpp-client cpu creative cross site replication csharp custom commands daas data container data entry data grids data structures data-as-a-service deadlock detection demo deployment dev-preview development devnation devoxx distributed executors distributed queries distribution docker documentation domain mode dotnet-client dzone refcard ec2 ehcache embedded embedded query equivalence event eviction example externalizers failover faq final fine grained flags flink full-text functional future garbage collection geecon getAll gigaspaces git github gke google graalvm greach conf gsoc hackergarten hadoop hbase health hibernate hibernate ogm hibernate search hot rod hotrod hql http/2 ide index indexing india infinispan infinispan 8 infoq internationalization interoperability interview introduction iteration javascript jboss as 5 jboss asylum jboss cache jbossworld jbug jcache jclouds jcp jdbc jdg jgroups jopr jpa js-client jsr 107 jsr 347 jta judcon kafka kubernetes lambda language learning leveldb license listeners loader local mode lock striping locking logging lucene mac management map reduce marshalling maven memcached memory migration minikube minishift minor release modules mongodb monitoring multi-tenancy nashorn native near caching netty node.js nodejs non-blocking nosqlunit off-heap openshift operator oracle osgi overhead paas paid support partition handling partitioning performance persistence podcast presentation presentations protostream public speaking push api putAll python quarkus query quick start radargun radegast react reactive red hat redis rehashing releaase release release candidate remote remote events remote query replication rest rest query roadmap rocksdb ruby s3 scattered cache scripting second level cache provider security segmented server shell site snowcamp spark split brain spring spring boot spring-session stable standards state transfer statistics storage store store by reference store by value streams substratevm synchronization syntax highlighting tdc testing tomcat transactions tutorial uneven load user groups user guide vagrant versioning vert.x video videos virtual nodes vote voxxed voxxed days milano wallpaper websocket websockets wildfly workshop xsd xsite yarn zulip
Posted by Unknown on 2015-09-07
Tags: java 8 streams API
back to top