Tuesday, 02 October 2018

Segmented Data Containers: Distributed Stream Performance Boost

Welcome to the first of several blog posts that describe the segmentation of containers that Infinispan uses to store data. Some of you may have noticed in the previous 9.3.0.Final notes that we announced a new feature named “Segmented On-Heap Data Container”. We also mentioned that “It improves performance of stream operations”, but what does that really mean?

What is a segmented data container and why does it matter? 

Imagine a cluster of 4 nodes in distributed mode (numOwners = 2) with entries for k0 - k13. It might look like this:


The data is distributed between the nodes with only two copies of each entry available. However, the data itself is stored internally in the same Map instance. As a result, when performing operations on all entries in the cache, Infinispan must iterate over the same data multiple times. This degrades performance.

As of Infinispan 9.3, a segmented data container is available to separate data by segments. Although only on-heap bounded and unbounded implementations are currently available.

With a segmented data container, that same data set might look like this:


Because Infinispan internally reasons on data in terms of segments, a segmented data container lets Infinispan process data only in specific segments. This allows for operations performed upon all entries to require iteration over the data only once.

Actual Performance Difference

So with the above example you might be thinking that the performance increase maximum is two times throughput, since numOwners is two. This is close, but not quite correct. While iterating on the data we also have to determine what segment an entry belongs to. With a segmented container we know this already, so there is no need to calculate that. This provides additional performance, as you will see.

The following graphs were generated using the benchmark at https://github.com/infinispan/infinispan-benchmarks/tree/master/iteration. The following command was run: java -jar target/benchmarks.jar -pvalueObjectSize=1000 -pentryAmount=50000 -pbatchSize=4096


The preceding graph is the result of the iteration methods. As you can notice the performance increase isn’t that much… why not?!?

Unfortunately, remote iteration requires a lot of network overhead, so we don’t get to see the full benefits of segmentation. But at least it is about 5-12% faster, not too shabby.

Now to show the real improvement, here is the chart showing the performance increase for the Cache#size operation:


If you notice there is huge increase in performance: almost a three fold increase over the non-segmented container, even though numOwners is only two. The old segment calculation adds a bit of overhead compared to just incrementing a number.

So keep in mind this change will show a larger gain in performance if the result returned is smaller, especially if it is a fixed size, such as a single int for Cache#size.

What about gets and puts?

Having the container segmented should also affect get and put performance as well, right? In testing the difference for get and puts are less than one percent, in favor of segmentation due to some optimizations we were able to add.

How do I enable this?

So the performance gains are noticeable, especially when the remote operation returns a small data set. But how can a user configure this? This is the nice part, due to no performance loss with other operations the container will always be segmented as long as the cache mode supports segmentation. That is if it is a Distributed, Replicated or Scattered cache.

A real-life example and closing

Since this feature has been around a while already, we actually have users gaining benefits from this feature. An example can be found at https://developer.jboss.org/message/983837#983837. In this case the user only upgraded to Infinispan 9.3 and received over a three-fold increase in performance when using distributed streams. It actually starts to bring distributed streams performance within range of indexed query for some use cases.

So, by upgrading your application to Infinispan 9.3 or newer, you will benefit from these improvements. There will be future posts regarding segmentation, including support for stores. Either way please feel free to download Infinispan, report bugs, chat with us, ask questions on the forum or on StackOverflow.

Posted by Unknown on 2018-10-02
Tags: streams segmented data container performance

Monday, 08 January 2018

Improving collect() for distributed Java Streams in Infinispan 9.2

As we progress with the release of Infinispan 9.2 pre-releases, it’s important to highlight some of the more interesting improvements from an end-user perspective.

As mentioned before, Infinispan Distributed Java Streams can be used to calculate analytics over existing data. Through overloading of methods, Infinispan is able to offer a simple way of passing lambdas that are made to be Serializable without the need of explicit casting. Being able to produce binary formats for the lambdas is an important step for java streams executions to be distributed. In this example, the cached values are being filtered to find those that have a delay bigger than 0. This lambda can be safely distributed without the need to cast to Serializable because values().stream() returns org.infinispan.CacheStream that overloads filter to take a SerializablePredicate:

Cache<String, Stop> cache = …​ cache.values().stream()   .filter(e → e.delayMin > 0);

However, there was one area which was still a bit clunky to use: Java Collectors. When Java Streams came out, the JDK provided a class called java.util.stream.Collectors which includes a lot of helper methods for collecting results after stream processing. The problem with the Collector instances returned by the helper methods is that they’re not Serializable.

Before Infinispan 9.2, we worked around this problem with the help of org.infinispan.stream.CacheCollectors which defined a serializableCollector method that took a SerializableSupplier<Collector<T, ?, R>>. The aim here was this: even if the Collector instance is not Serializable, the function that creates the Collector can be made to be Serializable. It could be used this way:

Cache<String, Stop> cache = …​ cache.values().stream().collect(   CacheCollectors.serializableCollector) → Collectors.groupingBy(       e → getHourOfDay(e.departureTs),       Collectors.counting());

Although this worked, it was a clunky, so in Infinispan 9.2 we overloaded collect() in org.infinispan.CacheStream to take SerializableSupplier<Collector<T, ?, R>>. This means that in Infinispan 9.2, the code above can be written like this instead:

Cache<String, Stop> cache = …​ cache.values().stream().collect(   () → Collectors.groupingBy(       e → getHourOfDay(e.departureTs),       Collectors.counting() ));

This is a cleaner way of making sure Collector instances returned by java.util.stream.Collectors can be distributed.

Cheers, Galder

Posted by Galder Zamarreño on 2018-01-08
Tags: streams



JUGs alpha as7 asymmetric clusters asynchronous beta c++ cdi chat clustering community conference configuration console data grids data-as-a-service database devoxx distributed executors docker event functional grouping and aggregation hotrod infinispan java 8 jboss cache jcache jclouds jcp jdg jpa judcon kubernetes listeners meetup minor release off-heap openshift performance presentations product protostream radargun radegast recruit release release 8.2 9.0 final release candidate remote query replication queue rest query security spring streams transactions vert.x workshop 8.1.0 API DSL Hibernate-Search Ickle Infinispan Query JP-QL JSON JUGs JavaOne LGPL License NoSQL Open Source Protobuf SCM administration affinity algorithms alpha amazon annotations announcement archetype archetypes as5 as7 asl2 asynchronous atomic maps atomic objects availability aws beer benchmark benchmarks berkeleydb beta beta release blogger book breizh camp buddy replication bugfix c# c++ c3p0 cache benchmark framework cache store cache stores cachestore cassandra cdi cep certification cli cloud storage clustered cache configuration clustered counters clustered locks codemotion codename colocation command line interface community comparison compose concurrency conference conferences configuration console counter cpp-client cpu creative cross site replication csharp custom commands daas data container data entry data grids data structures data-as-a-service deadlock detection demo deployment dev-preview devnation devoxx distributed executors distributed queries distribution docker documentation domain mode dotnet-client dzone refcard ec2 ehcache embedded query equivalence event eviction example externalizers failover faq final fine grained flags flink full-text functional future garbage collection geecon getAll gigaspaces git github gke google graalvm greach conf gsoc hackergarten hadoop hbase health hibernate hibernate ogm hibernate search hot rod hotrod hql http/2 ide index indexing india infinispan infinispan 8 infoq internationalization interoperability interview introduction iteration javascript jboss as 5 jboss asylum jboss cache jbossworld jbug jcache jclouds jcp jdbc jdg jgroups jopr jpa js-client jsr 107 jsr 347 jta judcon kafka kubernetes lambda language leveldb license listeners loader local mode lock striping locking logging lucene mac management map reduce marshalling maven memcached memory migration minikube minishift minor release modules mongodb monitoring multi-tenancy nashorn native near caching netty node.js nodejs nosqlunit off-heap openshift operator oracle osgi overhead paas paid support partition handling partitioning performance persistence podcast presentations protostream public speaking push api putAll python quarkus query quick start radargun radegast react reactive red hat redis rehashing releaase release release candidate remote remote events remote query replication rest rest query roadmap rocksdb ruby s3 scattered cache scripting second level cache provider security segmented server shell site snowcamp spark split brain spring spring boot spring-session stable standards state transfer statistics storage store store by reference store by value streams substratevm synchronization syntax highlighting testing tomcat transactions uneven load user groups user guide vagrant versioning vert.x video videos virtual nodes vote voxxed voxxed days milano wallpaper websocket websockets wildfly workshop xsd xsite yarn zulip

back to top