Monday, 19 February 2018
Infinispan hasn’t always provided a way for iterating upon entries in a distributed cache. In fact the first iteration wasn’t until Infinispan 7. Then in Infinispan 8, with the addition of Java 8, we fully integrated this into distributed streams, which brought some minor iteration improvements in performance.
We are proud to announce that with Infinispan 9.2 there are even more improvements. This contains no API changes, although those will surely come in the future. This one is purely for performance and utilization.
There are a few different aspects that have been changed. A lot of these revolve around the amount of entries being retrieved at once, which if you are familiar with DistributedStreams can be configured via the distributedBatchSize method. Note that if this is not specified it defaults to the chunk size in state transfer.
Infinispan core (embedded) has added rxjava2 and reactive streams as dependencies and rewrote all of the old push style iterator code over to pull style to fully utilize the Publisher and Subscriber interfaces.
With this we only pull up to the batchSize in entries at a time from any set of nodes. The old style utilized push with call stack blocking, which could return up two times the amount of entries. Also since we aren’t performing call stack blocking, we don’t have to waste threads as these calls to retrieve entries are done async and finish very quickly irrespective of user interaction. The old method required multiple threads to be reserved for this purpose.
The responses from a remote node are written directly to the output stream so there are no intermediate collections allocated. This means we only have to iterate upon the data once as we retain the iterator between requests. On the originator we still have to store the batches in a collection to be enqueued for the user to pull.
Great care was taken to implement parallel distribution in a way to vastly reduce contention and ensure that we properly follow the batchSize configuration.
When parallel distribution is in use the new implementation will start 4 remote node requests sharing the batch size (so each one gets 1/4). This way we can guarantee that we only have the desired size irrespective of the number of nodes in the cluster. The old implementation would request batchSize from all nodes at the same time. So not only did it reserve a thread for node but could easily swamp your JVM memory, causing OutOfMemoryErrors (which no one likes). The latter alone made us force the default to be sequential distribution when using an iterator.
The old implementation would write entries from all nodes (including local) to the same shared queue. The new implementation has a different queue for each request, which allows for faster queues with no locking to be used.
Due to these changes and other isolations between threads, we can now make parallel distribution the default setting for the iterator method. And as you will see this has improved performance nicely.
We have written a JMH test harness specifically for this blog post, testing 9.1.5.Final build against latest 9.2.0.SNAPSHOT. The test runs by default with 4GB of heap with 6 nodes in a distributed cache with 2 owners. It has varying entry count, entry sizes and distributed batch sizes.
Due to the variance in each test a large number of tests were ran and with different permutations to make sure it covered a large amount of test cases. The JMH test that was ran can be found at github. All the default settings were used for the run except -t4 (runs with 4 worker threads) was provided. This was all ran on my measly laptop (i7-4810MQ and 16 GB) - maxing out the CPU was not a hard task.
CAVEAT: The tests don’t do anything with the iterator and just try to pull them as fast as they can. Obviously if you have a lot of processing done between iterations you will likely not see as good of a performance increase.
The entire results can be found here. It shows each permutation and how many operations per second and finds the difference (green shows 5% or more and red shows -5% or less).
Specified Distribution Mode
The above 3 rows show a few different ways you could have been invoking the iterator method. The second row is probably by far the most used case. In this case you should see around a 11% increase in performance (results will vary). This is due to the new pulling method as well as parallel distribution becoming the new default running mode. It is unlikely a user was using the other 2 methods, but are provided for a more complete view.
If you were specifying a distribution mode manually, either sequential or distribution you will only see a few percent faster run (3.5%), but every little bit helps! Also if you can switch to parallel you may want to think about doing so.
Also you can see if you were running with rehash disabled prior, it has even more gains (14%). Those don’t even include the fact that no rehash was 28% faster than with before (which means it is about 32% faster in general now). So if you can get away with a at most once guarantee, disabling rehash will provide the best throughput.
As was mentioned this is not exposed to the user directly. You still interact with the iterator as you would normally. We should remedy this at some point.
We would love to eventually expose a method to return a Publisher directly to the user so that they can get the full benefits of having a pull based implementation underneath.
This way any intermediate operations applied to the stream before would be distributed and anything applied to the Publisher would be done locally. And just like the iterator method this publisher would be fully rehash aware if you have it configured to do so and would make sure you get all entries delivered in an exactly once fashion (rehash disabled guarantees at most once).
Another side benefit is that the Subscriber methods could be called on different threads so there is no overhead required on the ISPN side for coordinating these into queue(s). Thus the Subscriber should be able to retrieve all entries faster than just doing an iterator.
Also many of you may be wondering why we aren’t using the new Flow API introduced in Java 9. Luckily the Flow API is a 1:1 conversion of reactive streams. So whenever Infinispan will start supporting Java 9 interfaces/classes, we hope to properly expose these as the JDK classes.
With Infinispan 9.3, we hope to introduce data container and cache store segment aware iteration. This means when iterating over either we would only have to process entries that map to a given segment. This should reduce the time and processing for iteration substantially, especially for cache stores. Keep your eyes out for a future blog post detailing these as 9.3 development commences.
We hope you find a bit more performance when working with your distributed iteration. Also we value any feedback on what you want our APIs to look like or find any bugs. As always let us know at any of the places listed here.
Tags: performance streams distribution iteration
Thursday, 22 March 2012
The feedback keeps coming, particularly from AS7 users, so we’ve decided to do another point release in the 5.1 'Brahma' series. Apart from fixing several issues, including a critical L1 cache memory leak in active/passive set ups, this version enables the JBoss Marshaller class resolver to be configured via both the old and new programmatic configuration API. This enables Infinispan to provide a better solution for marshalling/unmarshalling classes in modular environments.
Tags: marshalling release candidate distribution
Tuesday, 19 April 2011
A brand new Infinispan 5.0 "Pagoa" beta is out now, 5.0.0.BETA2 bringing even more goodies for Infinispan users:
Initial implementation of virtual nodes for consistent hash algorithm based distribution is included. This means that each Infinispan node can now pick multiple nodes in the hash wheel reducing the standard deviation and so improving the distribution of data. The configuration is done via the numVirtualNodes attribute in hash element.
The externalizer configuration has been revamped in order to make it more user-friendly! You only need the @SerializeWith annotation and an Externalizer implementation in its most basic form, but more advanced externalizer configuration is still available for particular use cases. The wiki on plugging externalizers has been rewritten to show these changes.
lazyDeserialization XML element has been renamed to *http://docs.jboss.org/infinispan/5.0/apidocs/config.html#ce_default_storeAsBinary[storeAsBinary]* in order to better represent its function. The previous programmatic configuration for this option has been deprecated to help ease migration but your XML will need changing.
All references to JOPR, including the maven module name have been renamed to RHQ. So bear make sure you plug your RHQ server with *infinispan-rhq-plugin.jar* instead of infinispan-jopr-plugin.jar
Tags: beta marshalling virtual nodes externalizers distribution
Friday, 21 August 2009
People have often commented on Buddy Replication (from JBoss Cache) not being available in Infinispan, and have asked how Infinispan’s far superior distribution mode works. I’ve decided to write this article to discuss the main differences from a high level. For deeper technical details, please visit the Infinispan wiki.
Scalability versus high availability These two concepts are often at odds with one another, even though they are commonly lumped together. What is usually good for scalability isn’t always good for high availability, and vice versa. When it comes to clustering servers, high availability often means simply maintaining more copies, so that if nodes fail - and with commodity hardware, this is expected - state is not lost. An extreme case of this is replicated mode, available in both JBoss Cache and Infinispan, where each node is a clone of its neighbour. This provides very high availability, but unfortunately, this does not scale well. Assume you have 2GB per node. Discounting overhead, with replicated mode, you can only address 2GB of space, regardless of how large the cluster is. Even if you had 100 nodes - seemingly 200GB of space! - you’d still only be able to address 2GB since each node maintains a redundant copy. Further, since every node needs a copy, a lot of network traffic is generated as the cluster size grows.
Enter Buddy Replication Buddy Replication (BR) was originally devised as a solution to this scalability problem. BR does not replicate state to every other node in the cluster. Instead, it chooses a fixed number of 'backup' nodes and only replicates to these backups. The number of backups is configurable, but in general it means that the number of backups is fixed. BR improved scalability significantly and showed near-linear scalability with increasing cluster size. This means that as more nodes are added to a cluster, the space available grows linearly as does the available computing power if measured in transactions per second.
But Buddy Replication doesn’t help everybody! BR was specifically designed around the HTTP session caching use-case for the JBoss Application Server, and heavily optimised accordingly. As a result, session affinity is mandated, and applications that do not use session affinity can be prone to a lot of data gravitation and 'thrashing' - data is moved back and forth across a cluster as different nodes attempt to claim 'ownership' of state. Of course this is not a problem with JBoss AS and HTTP session caching - session affinity is recommended, available on most load balancer hardware and/or software, is taken for granted, and is a well-understood and employed paradigm for web-based applications.
So we had to get better Just solving the HTTP session caching use-case wasn’t enough. A well-performing data grid needs to to better, and crucially, session affinity cannot be taken for granted. And this was the primary reason for not porting BR to Infinispan. As such, Infinispan does not and will not support BR as it is too restrictive.
Distribution Distribution is a new cache mode in Infinispan. It is also the default clustered mode - as opposed to replication, which isn’t scalable. Distribution makes use of familiar concepts in data grids, such as consistent hashing, call proxying and local caching of remote lookups. What this leads to is a design that does scale well - fixed number of replicas for each cache entry, just like BR - but no requirement for session affinity.
What about co-locating state? Co-location of state - moving entries about as a single block - was automatic and implicit with BR. Since each node always picked a backup node for all its state, one could visualize all of the state on a given node as a single block. Thus, colocation was trivial and automatic: whatever you put in Node1 will always be together, even if Node1 eventually dies and the state is accessed on Node2. However, this meant that state cannot be evenly balanced across a cluster since the data blocks are very coarse grained. With distribution, colocation is not implicit. In part due to the use of consistent hashing to determine where each cached entry resides, and also in part due to the finer-grained cache structure of Infinispan - key/value pairs instead of a tree-structure - this leads to individual entries as the granularity of state blocks. This means nodes can be far better balanced across a cluster. However, it does mean that certain optimizations which rely on co-location - such as keeping related entries close together - is a little more tricky.
One approach to co-locate state would be to use containers as values. For example, put all entries that should be colocated together into a HashMap. Then store the HashMap in the cache. But that is coarse-grained and ugly as an approach, and will mean that the entire HashMap would need to be locked and serialized as a single atomic unit, which can be expensive if this map is large.
Another approach is to use Infinispan’s AtomicMap API. This powerful API lets you group entries together, so they will always be colocated, locked together, but replication will be much finer-grained, allowing only deltas to the map to be replicated. So that makes replication fast and performant, but it still means everything is locked as a single atomic unit. While this is necessary for certain applications, it isn’t always be desirable.
One more solution is to implement your own ConsistentHash algorithm - perhaps extending DefaultConsistentHash. This implementation would have knowledge of your object model, and hashes related instances such that they are located together in the hash space. By far the most complex mechanism, but if performance and co-location really is a hard requirement then you cannot get better than this approach.
Session affinity mandatory
Applicable to a specific set of use cases due to the session affinity requirement
No session affinity needed
Co-location requires special treatment, ranging in complexity based on performance and locking requirements. By default, no co-location is provided
Applicable to a far wider range of use cases, and hence the default highly scalable clustered mode in Infinispan
Hopefully this article has sufficiently interested you in distribution, and has whetted your appetite for more. I would recommend the Infinispan wiki which has a wealth of information including interactive tutorials and GUI demos, design documents and API documentation. And of course you can’t beat downloading Infinispan and trying it out, or grabbing the source code and looking through the implementation details.
Tags: buddy replication partitioning distribution