Monday, 19 February 2018
Infinispan hasn’t always provided a way for iterating upon entries in a distributed cache. In fact the first iteration wasn’t until Infinispan 7. Then in Infinispan 8, with the addition of Java 8, we fully integrated this into distributed streams, which brought some minor iteration improvements in performance.
We are proud to announce that with Infinispan 9.2 there are even more improvements. This contains no API changes, although those will surely come in the future. This one is purely for performance and utilization.
There are a few different aspects that have been changed. A lot of these revolve around the amount of entries being retrieved at once, which if you are familiar with DistributedStreams can be configured via the distributedBatchSize method. Note that if this is not specified it defaults to the chunk size in state transfer.
Infinispan core (embedded) has added rxjava2 and reactive streams as dependencies and rewrote all of the old push style iterator code over to pull style to fully utilize the Publisher and Subscriber interfaces.
With this we only pull up to the batchSize in entries at a time from any set of nodes. The old style utilized push with call stack blocking, which could return up two times the amount of entries. Also since we aren’t performing call stack blocking, we don’t have to waste threads as these calls to retrieve entries are done async and finish very quickly irrespective of user interaction. The old method required multiple threads to be reserved for this purpose.
The responses from a remote node are written directly to the output stream so there are no intermediate collections allocated. This means we only have to iterate upon the data once as we retain the iterator between requests. On the originator we still have to store the batches in a collection to be enqueued for the user to pull.
Great care was taken to implement parallel distribution in a way to vastly reduce contention and ensure that we properly follow the batchSize configuration.
When parallel distribution is in use the new implementation will start 4 remote node requests sharing the batch size (so each one gets 1/4). This way we can guarantee that we only have the desired size irrespective of the number of nodes in the cluster. The old implementation would request batchSize from all nodes at the same time. So not only did it reserve a thread for node but could easily swamp your JVM memory, causing OutOfMemoryErrors (which no one likes). The latter alone made us force the default to be sequential distribution when using an iterator.
The old implementation would write entries from all nodes (including local) to the same shared queue. The new implementation has a different queue for each request, which allows for faster queues with no locking to be used.
Due to these changes and other isolations between threads, we can now make parallel distribution the default setting for the iterator method. And as you will see this has improved performance nicely.
We have written a JMH test harness specifically for this blog post, testing 9.1.5.Final build against latest 9.2.0.SNAPSHOT. The test runs by default with 4GB of heap with 6 nodes in a distributed cache with 2 owners. It has varying entry count, entry sizes and distributed batch sizes.
Due to the variance in each test a large number of tests were ran and with different permutations to make sure it covered a large amount of test cases. The JMH test that was ran can be found at github. All the default settings were used for the run except -t4 (runs with 4 worker threads) was provided. This was all ran on my measly laptop (i7-4810MQ and 16 GB) - maxing out the CPU was not a hard task.
CAVEAT: The tests don’t do anything with the iterator and just try to pull them as fast as they can. Obviously if you have a lot of processing done between iterations you will likely not see as good of a performance increase.
The entire results can be found here. It shows each permutation and how many operations per second and finds the difference (green shows 5% or more and red shows -5% or less).
Specified Distribution Mode
The above 3 rows show a few different ways you could have been invoking the iterator method. The second row is probably by far the most used case. In this case you should see around a 11% increase in performance (results will vary). This is due to the new pulling method as well as parallel distribution becoming the new default running mode. It is unlikely a user was using the other 2 methods, but are provided for a more complete view.
If you were specifying a distribution mode manually, either sequential or distribution you will only see a few percent faster run (3.5%), but every little bit helps! Also if you can switch to parallel you may want to think about doing so.
Also you can see if you were running with rehash disabled prior, it has even more gains (14%). Those don’t even include the fact that no rehash was 28% faster than with before (which means it is about 32% faster in general now). So if you can get away with a at most once guarantee, disabling rehash will provide the best throughput.
As was mentioned this is not exposed to the user directly. You still interact with the iterator as you would normally. We should remedy this at some point.
We would love to eventually expose a method to return a Publisher directly to the user so that they can get the full benefits of having a pull based implementation underneath.
This way any intermediate operations applied to the stream before would be distributed and anything applied to the Publisher would be done locally. And just like the iterator method this publisher would be fully rehash aware if you have it configured to do so and would make sure you get all entries delivered in an exactly once fashion (rehash disabled guarantees at most once).
Another side benefit is that the Subscriber methods could be called on different threads so there is no overhead required on the ISPN side for coordinating these into queue(s). Thus the Subscriber should be able to retrieve all entries faster than just doing an iterator.
Also many of you may be wondering why we aren’t using the new Flow API introduced in Java 9. Luckily the Flow API is a 1:1 conversion of reactive streams. So whenever Infinispan will start supporting Java 9 interfaces/classes, we hope to properly expose these as the JDK classes.
With Infinispan 9.3, we hope to introduce data container and cache store segment aware iteration. This means when iterating over either we would only have to process entries that map to a given segment. This should reduce the time and processing for iteration substantially, especially for cache stores. Keep your eyes out for a future blog post detailing these as 9.3 development commences.
We hope you find a bit more performance when working with your distributed iteration. Also we value any feedback on what you want our APIs to look like or find any bugs. As always let us know at any of the places listed here.
Tags: performance streams distribution iteration
Thursday, 01 February 2018
Infinispan has quite a few spectacular ways of executing code in the grid. But I bet you haven’t heard or aren’t really familiar with those, which is disappointing. I hope to fix this, however, as we have added more information to the user guide and wanted to detail that here in this blog.
As I am sure you are aware Infinispan can be used in embedded (in your JVM) and remote (in a standalone server). Unfortunately, this means there are different ways of executing code based on which mode you are in.
The embedded mode has the most features available and is the easiest to use. The appropriate section can be found here.
One question that seems to come up more than others is how a user can perform cache operations on all data, such as remove all elements that match a given filter. If you are curious about this one, you should check out the Examples section with the example named "Remove specific entries" as it details how a user would do exactly that.
I should also point out the new Cluster Executor section, which is similar to Streams that replaced Map Reduce, is here to replace the old Distributed Executor. With Cluster Executor and Distributed Streams there is a clearer distinction between executing code on nodes (Cluster Executor) and executing code based on data (Distributed Streams). **
The server is a bit more interesting and usually requires configuration ahead of time, unlike Embedded. It can be found in this section. The benefit of the server is most of these can invoke embedded operations internally.
Scripting is by far the easiest to use - just insert your script and execute - but has some limitations that we haven’t been able to fix yet.
Server tasks can run pretty much any Java but require registering classes beforehand. Unfortunately, this section still needs to be filled in and should be added sometime in the near future. I would say, until then, if you are interested, you can look at some tests in github.
I hope this has helped users be able to find out some more information about the various ways of executing arbitrary code for your data. If you have any questions or need more clarification about the features highlighted here, please don’t hesitate to let us know at any of these places.
Tags: distributed executors streams
Monday, 03 July 2017
In Infinispan 8.0 we were very excited to announce Distributed Streams as we moved to Java 8. This feature allows applying any of the various java.util.stream.Stream operations on the datagrid, which are performed in a distributed nature, providing the highest possible performance as data is processed on the node where it lives, only requiring the terminal operation intermediate results to be returned to the invoker.
One problem with distributed streams though is that data is processed without acquiring locks: great for performance, but there is no guarantee that some other process isn’t concurrently modifying the cache entry you’re working on. Consider the following example which iterates through the entire contents of a cache, modifying each entry based on its existing value:
This works great until you have another cache put() running concurrently that changes a value. In this case the only way to be sure that an update is applied properly is to perform an optional update in the forEach. In a transactional cache you could also lock the entry manually (pessimistic) or retry on a WriteSkewException (optimistic). For example this is how the optional update could be performed.
As you can see the code isn’t as pretty as it was before, but is still pretty concise.
Infinispan 9.1 introduces locked streams, which allow you to run your operation knowing that another update cannot be performed while running the Consumer. Note this only works in non transactional and pessimistic transactional caches (optimistic transactional caches are not supported).
If you notice the code looks very similar to the first example. You just have to invoke the lockedStream method on the AdvancedCache and then you can use the stream knowing that data for the given key won’t change while performing your update.
This locked stream has a slightly limited API compared to the normal java.util.stream API. Only the filter method is allowed in addition to forEach. The CacheStream API is also supported, with a few exceptions. For more details on the API and what methods are supported you should check out the Javadoc.
The lock is only acquired for the given key while invoking the Consumer, allowing other updates on other keys to be performed concurrently, just like a normal put operation would behave. It is not suggested to perform operations on other keys in the Consumer, as this could cause possible deadlocks.
Now go forth and perform operations using the data stream knowing that the data underneath has not changed!
Friday, 07 April 2017
During the talk we presented a couple of small demos that showcased some in-memory data grid use cases. The demos are located here, but I thought it’d be useful to provide some step-by-step here so that you can get them running as quickly as possible.
Before we start with any of the demos, it’s necessary to run some set up steps:
1. Check out git repository:
2. Download Infinispan Server 9.0.0.Final and at the same level as the git repository.
3. Go into the datagrid-patterns directory, start the servers and wait until they’ve started:
cd datagrid-patterns ./run-servers.sh
4. Install Anaconda for Python 3, this is required to run Jupyter notebook for plotting.
5. Install Maven 3.
Once the set up is complete, it’s time to start with the individual demos.
Both demos shown below work with the same application domain: rail transport systems. In this domain, we differentiate between physical stations, trains, station boards which are located in stations, and finally stops, which are individual entries in station boards.
The first demo is focused on how you can use Infinispan for doing offline analytics. In particular, this demo tries to answer the following question:
Q. What is the time of the day when there is the biggest ratio of delayed trains?
To answer this question, Infinispan data grid will be loaded with 3 weeks worth of data from station boards. Once the data is loaded, we will execute a remote server task which will use Infinispan Distributed Java Streams to calculate the two pieces of information required to answer the question: per hour, how many trains are going through the system, and out of those, how many are delayed.
An important aspect to bear in mind about this server tasks is that it will only be executed in one of the nodes in the cluster. It does not matter which one. In turn, this node will will ship the lambdas required to do the computation to each of the nodes so that they can executed against their local data. The other nodes will reply with the results and the node where the server task was invoked will aggregate the results.
Then, these results are sent back to the client, which in turn, stores the results as JSON in an intermediate cache. Once the results are in place, we will use a Jupyter notebook to read those results and plot the result.
Let’s see these steps in action:
1. First, we need to install the server tasks in the running servers above:
mvn clean install package -am -pl analytics-server
mvn wildfly:deploy -pl analytics-server
2. Open the datagrid-pattern repo with your favourite IDE and run delays.java.stream.InjectApp class located in analytics/analytics-server project. This command will inject the data into the cache. On my environment, it takes between 1 and 2 minutes.
3. With the data loaded, we need to run the remote task that will calculate the total number of trains per hour and how many of those are delayed. To do that, execute delays.java.stream.AnalyticsApp class located in analytics/analytics-server project from your IDE.
4. You can verify that the results have been calculating by going to the following address:
5. With the results in place, it’s time to start the Jupyter notebook:
6. Once the notebook opens, click open live-demo.ipynb notebook and execute each of the cells in order. You should end up seeing a plot like this:
So, the answer to the question:
Q. What is the time of the day when there is the biggest ratio of delayed trains?
is 2am! That’s because last connecting trains of the day wait for each other to avoid leaving passengers stranded.
The second demo that we presented uses the same application domain as above, but this time we’re trying to use our data grid as a way of storing the station board state of each station at a given point in time. So, the idea is to use Infinispan as an in memory data grids for working with real time data.
So, what can we do with this type of data? In our demo, we will create a centralised dashboard of delayed trains around the country. To do that, we will take advantage of Infinispan’s Continuous Query functionality which allows us to find those station boards which contain stops that are delayed, and as new delayed trains appeared these will be pushed to our dashboard.
To run this demo, keep the same servers running as above and do the following:
Run delays.query.continuous.FxApp application located in real-time project inside the datagrid-patterns demo. This app will inject some live station board data and will launch a JavaFX dashboard that shows delayed trains as they appear. It should look something like this:
This has been a summary of the demos that we run in our talk at Devoxx France with the intention of getting you running these demos as quickly as possible. The repository contains more detailed information of these demos. If there’s anything unclear or any of the instructions above are not working, please let us know!
Thanks to Emmanuel Bernard for partnering with me for this Devoxx France talk and for the continuous feedback while developing the demos. Thanks as well to Tristan Tarrant for the input in the demos and many thanks to all Devoxx France attendees who attended our talk :)
A very special thanks to Alexandre Masselot whose "Swiss Transport in Real Time: Tribulations in the Big Data Stack" talk at Soft-Shake 2016 was the inspiration for these demos. @Alex, thanks a lot for sharing the demos and data with me and the rest of the community!!
In a just a few weeks I’ll be at Great Indian Developer Summit presenting these demos and much more! Stay tuned :)
Tags: conference devoxx demo streams query
Tuesday, 06 December 2016
As I hope most people reading this already know, since Infinispan 8 you can utilize the entire Java 8 Stream API and have it be distributed across your cluster. This performs the various intermediate and terminal operations on the data local to the node it lives on, providing for extreme performance. There are some limitations and things to know as was explained at distributed-streams.
The problem with the API up to now was that, if you wanted to use lambdas, it was quite an ugly scene. Take for example the following code snippet:
However, for Infinispan 9 we utilize a little syntax feature added with Java 8
to add some much needed quality of life improvements. This allows the most specific interface to be chosen when a method is overloaded. This allows for a neat interaction when we add some new interfaces that implement Serializable and the various function interfaces (SerializableFunction, SerializablePredicate, SerializableSupplier, etc). All of the Stream methods have been overridden on the CacheStream interface to take these arguments.
This allows for the code to be much cleaner as we can see here:
This is not the only benefit of providing the CacheStream interface: we can also provide new methods that aren’t available on the standard Stream interface. One example is the forEach method which allows the user to more easily provide a Cache that is injected on each node as required. This way you don’t have to use the clumsy CacheAware interface and can directly use lambdas as desired.
Here is an example of the new forEach method in action:
In this example we take a cache and, based on the keys in it, write those values into another cache. Since forEach doesn’t have to be side effect free, you can do whatever you want inside here.
All in all these improvements should make using Distributed Streams with Infinispan much easier. The extra methods could be extended further if users have use cases they would love to suggest. Just let us know, and I hope you enjoy using Infinispan!
Monday, 01 February 2016
The recording of the talk presented last Wednesday in London is available! Thanks to everyone who joined and to the London JBUG organizers for the awesome new venue!
Tags: functional presentations spark hadoop streams video
Monday, 07 September 2015
Now that Infinispan supports Java 8, we can take full advantage of some of the new features. One of the big features of Java 8 is the new Stream classes. This flips the head on processing data so that instead of having to iterate upon the data yourself the underlying Stream handles that and you just provide the operations to perform on it. This lends itself great to distributed processing as the iteration is handled entirely by the implementation (in this case Infinispan).
I therefore am glad to introduce for Infinispan 8, the feature Distributed Streams! This allows for any operation you can perform on a regular Stream to also be performed on a Distributed cache (assuming the operation and data is marshallable).
When using a distributed or replicated cache, the keys and values of the cache must be marshallable. This is the same case for intermediate and terminal operations when using the distributed streams. Normally you would have to provide an instance of some new class that is either Serializable or has an Externalizer registered for it as described in the marshallable section of the user guide.
However, Java 8 also introduced lambdas, which can be defined as serializable very easily (although it is a bit awkward). An example of this serialization can be found here.
Some of you may also be aware of the Collectors class which is used with the collect method on a stream. Unfortunately, all of the Collectors produced are not able to be marshalled. As such, Infinispan has added a utility class that can work in conjunction with the Collectors class. This allows you to still use any combination of the Collectors classes and still work properly when everything is required to be marshalled.
Java 8 streams naturally have a sense of parallelism. That is that the stream can be marked as being parallel. This in turn allows for the operations to be performed in parallel using multiple threads. The best part is how simple it is to do. The stream can be made parallel when first retrieving it by invoking parallelStream or you can optionally enable it after the Stream is retrieved by just invoking parallel.
The new Distributed streams from Infinispan take this one step further, which I am calling parallel distribution. That is that since data is already partitioned across nodes we can also allow operations to be ran simultaneously on different nodes at the same time. This option is enabled by default. However this can be controlled by using the new CacheStream interface discussed just below. Also, to be clear, the Java 8 parallel can be used in conjunction with parallel distribution. This just means you will have concurrent operations running on multiple nodes across multiple threads on each node.
There is a new interface Cachestream provided that allows for controlling additional options when using a Distributed Stream. I am highlighting the added methods (note comments have been removed from gist)
This method controls how many elements are brought back at one time for operations that are key aware. These operations are (spl)iterator and forEach. This is useful to tweak how many keys are held in memory from a remote node. Thus it is a tradeoff of performance (more keys) versus memory. This defaults to the chunk size as configured by state transfer.
This was discussed in the parallelism section above. Note that all commands have this enabled by default except for spl(iterator) methods.
This method can be used to have the distributed stream only operate on a given set of keys. This is done in a very efficient way as it will only perform the operation on node(s) that own the given keys. Using a given set of keys also allows for constant access time from the data container/store as the cache doesn’t have to look at every single entry in the cache.
This is useful to do filtering of instances in a more performant way. Normally, you could use the filter intermediate operation, but this method is performed before any of the operations are performed to most efficiently limit the entries that are presented for stream processing. For example, if only a subset of segments are required, it may not have to send a remote request.
Similar to the previous method, this is related to key segments. This listener allows for the end user to be notified when a segment has been completed for processing. This can be useful if you want to keep track of completion and if this node goes down, you can rerun the processing with only the unprocessed segments. Currently, this listener is only supported for spl(iterator) methods.
By default, all stream operations are what is called rehash aware. That is if a node joins or leaves the cluster while the operation is in progress the cluster will be aware of this and ensure that all data is processed properly with no loss (assuming no data was actually lost).
This can be disabled by calling disableRehashAware; however, if a rehash is to occur in the middle of the operation, it is possible that all data may not be processed. It should be noted that data is not processed multiple times with this disabled, only a loss of data can occur.
This option is not normally recommended unless you have a situation where you can afford to only operate on a subset of data. The tradeoff is that the operation can perform faster, especially (spl)iterator and forEach methods.
The age old example of map/reduce is always word count. Streams allow you to do that as well! Here is an equivalent word count example assuming you have a Cache containing String keys and values and you want the count of all words in the values. Some of you may be wondering how this relates to our existing map/reduce framework. The plan is to deprecate the existing Map/Reduce and replace it completely with the new distributed streams at a later point.
Remember though that distributed streams can do so much more than just map/reduce. And there are a lot of examples already out there for streams. To use the distributed streams, you just need to make sure your operations are marshallable, and you are good to go.
Here are a few pages with examples of how to use streams straight from Oracle:
I hope you enjoy Distributed Streams. We hope they change how you interact with your data in the cluster!
Let us know what you think, any issues or usages you would love to share!
Tags: java 8 streams API