Monday, 19 February 2018

Distributed iteration improvements

Infinispan hasn’t always provided a way for iterating upon entries in a distributed cache. In fact the first iteration wasn’t until Infinispan 7. Then in Infinispan 8, with the addition of Java 8, we fully integrated this into distributed streams, which brought some minor iteration improvements in performance.

We are proud to announce that with Infinispan 9.2 there are even more improvements. This contains no API changes, although those will surely come in the future. This one is purely for performance and utilization.

New implementation details


There are a few different aspects that have been changed.  A lot of these revolve around the amount of entries being retrieved at once, which if you are familiar with DistributedStreams can be configured via the distributedBatchSize method. Note that if this is not specified it defaults to the chunk size in state transfer.

Entry retrieval is now pull based instead of push

Infinispan core (embedded) has added rxjava2 and reactive streams as dependencies and rewrote all of the old push style iterator code over to pull style to fully utilize the Publisher and Subscriber interfaces.

With this we only pull up to the batchSize in entries at a time from any set of nodes. The old style utilized push with call stack blocking, which could return up two times the amount of entries. Also since we aren’t performing call stack blocking, we don’t have to waste threads as these calls to retrieve entries are done async and finish very quickly irrespective of user interaction. The old method required multiple threads to be reserved for this purpose.

Streamed batches

The responses from a remote node are written directly to the output stream so there are no intermediate collections allocated. This means we only have to iterate upon the data once as we retain the iterator between requests. On the originator we still have to store the batches in a collection to be enqueued for the user to pull.

Rewritten Parallel Distribution

Great care was taken to implement parallel distribution in a way to vastly reduce contention and ensure that we properly follow the batchSize configuration.

When parallel distribution is in use the new implementation will start 4 remote node requests sharing the batch size (so each one gets 1/4). This way we can guarantee that we only have the desired size irrespective of the number of nodes in the cluster. The old implementation would request batchSize from all nodes at the same time. So not only did it reserve a thread for node but could easily swamp your JVM memory, causing OutOfMemoryErrors (which no one likes). The latter alone made us force the default to be sequential distribution when using an iterator.

The old implementation would write entries from all nodes (including local) to the same shared queue. The new implementation has a different queue for each request, which allows for faster queues with no locking to be used.

Due to these changes and other isolations between threads, we can now make parallel distribution the default setting for the iterator method. And as you will see this has improved performance nicely.


We have written a JMH test harness specifically for this blog post, testing 9.1.5.Final build against latest 9.2.0.SNAPSHOT. The test runs by default with 4GB of heap with 6 nodes in a distributed cache with 2 owners. It has varying entry count, entry sizes and distributed batch sizes.

Due to the variance in each test a large number of tests were ran and with different permutations to make sure it covered a large amount of test cases. The JMH test that was ran can be found at github. All the default settings were used for the run except -t4 (runs with 4 worker threads) was provided. This was all ran on my measly laptop (i7-4810MQ and 16 GB) - maxing out the CPU was not a hard task.

CAVEAT: The tests don’t do anything with the iterator and just try to pull them as fast as they can. Obviously if you have a lot of processing done between iterations you will likely not see as good of a performance increase.

The entire results can be found here. It shows each permutation and how many operations per second and finds the difference (green shows 5% or more and red shows -5% or less).

Operation Average Gain Code

Specified Distribution Mode






No Rehash



The above 3 rows show a few different ways you could have been invoking the iterator method. The second row is probably by far the most used case. In this case you should see around a 11% increase in performance (results will vary). This is due to the new pulling method as well as parallel distribution becoming the new default running mode. It is unlikely a user was using the other 2 methods, but are provided for a more complete view.

If you were specifying a distribution mode manually, either sequential or distribution you will only see a few percent faster run (3.5%), but every little bit helps! Also if you can switch to parallel you may want to think about doing so.

Also you can see if you were running with rehash disabled prior, it has even more gains (14%). Those don’t even include the fact that no rehash was 28% faster than with before (which means it is about 32% faster in general now). So if you can get away with a at most once guarantee, disabling rehash will provide the best throughput.

Whats next? 

As was mentioned this is not exposed to the user directly. You still interact with the iterator as you would normally. We should remedy this at some point.

Expose new method

We would love to eventually expose a method to return a Publisher directly to the user so that they can get the full benefits of having a pull based implementation underneath.

This way any intermediate operations applied to the stream before would be distributed and anything applied to the Publisher would be done locally. And just like the iterator method this publisher would be fully rehash aware if you have it configured to do so and would make sure you get all entries delivered in an exactly once fashion (rehash disabled guarantees at most once).

Another side benefit is that the Subscriber methods could be called on different threads so there is no overhead required on the ISPN side for coordinating these into queue(s). Thus the Subscriber should be able to retrieve all entries faster than just doing an iterator.

Java 9 Flow

Also many of you may be wondering why we aren’t using the new Flow API introduced in Java 9. Luckily the Flow API is a 1:1 conversion of reactive streams. So whenever Infinispan will start supporting Java 9 interfaces/classes, we hope to properly expose these as the JDK classes.

Segment Based Iteration 

With Infinispan 9.3, we hope to introduce data container and cache store segment aware iteration. This means when iterating over either we would only have to process entries that map to a given segment. This should reduce the time and processing for iteration substantially, especially for cache stores. Keep your eyes out for a future blog post detailing these as 9.3 development commences.

Give us Feedback

We hope you find a bit more performance when working with your distributed iteration. Also we value any feedback on what you want our APIs to look like or find any bugs. As always let us know at any of the places listed here.

Posted by Unknown on 2018-02-19
Tags: performance streams distribution iteration



JUGs alpha as7 asymmetric clusters asynchronous beta c++ cdi chat clustering community conference configuration console data grids data-as-a-service database devoxx distributed executors docker event functional grouping and aggregation hotrod infinispan java 8 jboss cache jcache jclouds jcp jdg jpa judcon kubernetes listeners meetup minor release off-heap openshift performance presentations product protostream radargun radegast recruit release release 8.2 9.0 final release candidate remote query replication queue rest query security spring streams transactions vert.x workshop 8.1.0 API DSL Hibernate-Search Ickle Infinispan Query JP-QL JSON JUGs JavaOne LGPL License NoSQL Open Source Protobuf SCM administration affinity algorithms alpha amazon anchored keys annotations announcement archetype archetypes as5 as7 asl2 asynchronous atomic maps atomic objects availability aws beer benchmark benchmarks berkeleydb beta beta release blogger book breizh camp buddy replication bugfix c# c++ c3p0 cache benchmark framework cache store cache stores cachestore cassandra cdi cep certification cli cloud storage clustered cache configuration clustered counters clustered locks codemotion codename colocation command line interface community comparison compose concurrency conference conferences configuration console counter cpp-client cpu creative cross site replication csharp custom commands daas data container data entry data grids data structures data-as-a-service deadlock detection demo deployment dev-preview development devnation devoxx distributed executors distributed queries distribution docker documentation domain mode dotnet-client dzone refcard ec2 ehcache embedded embedded query equivalence event eviction example externalizers failover faq final fine grained flags flink full-text functional future garbage collection geecon getAll gigaspaces git github gke google graalvm greach conf gsoc hackergarten hadoop hbase health hibernate hibernate ogm hibernate search hot rod hotrod hql http/2 ide index indexing india infinispan infinispan 8 infoq internationalization interoperability interview introduction iteration javascript jboss as 5 jboss asylum jboss cache jbossworld jbug jcache jclouds jcp jdbc jdg jgroups jopr jpa js-client jsr 107 jsr 347 jta judcon kafka kubernetes lambda language learning leveldb license listeners loader local mode lock striping locking logging lucene mac management map reduce marshalling maven memcached memory migration minikube minishift minor release modules mongodb monitoring multi-tenancy nashorn native near caching netty node.js nodejs non-blocking nosqlunit off-heap openshift operator oracle osgi overhead paas paid support partition handling partitioning performance persistence podcast presentation presentations protostream public speaking push api putAll python quarkus query quick start radargun radegast react reactive red hat redis rehashing releaase release release candidate remote remote events remote query replication rest rest query roadmap rocksdb ruby s3 scattered cache scripting second level cache provider security segmented server shell site snowcamp spark split brain spring spring boot spring-session stable standards state transfer statistics storage store store by reference store by value streams substratevm synchronization syntax highlighting tdc testing tomcat transactions tutorial uneven load user groups user guide vagrant versioning vert.x video videos virtual nodes vote voxxed voxxed days milano wallpaper websocket websockets wildfly workshop xsd xsite yarn zulip

back to top