Step 10: Temperature averages with streams

Now that Infinispan is storing some interesting data, we might want to perform some computation on it. In our weather application, let's compute the average temperature for every country. One simplistic approach would be to have one node perform the computation iterating over the whole dataset. This however has two downsides: it requires fetching the data from the respective owners (which might mean going over the slow network instead of the fast memory) and it only takes advantage of the horsepower of one node, while we have a whole cluster at our disposal. What a waste !

Infinispan offers a solution to both problems: using the powerful concepts of distributed execution and distributed streams we can use all the available nodes to perform computation locally to where the data is, and then send the results for aggregation to a master node. Let's see how we can use streams to compute the average country temperature.

To compute this we will need to group each of the locations by its country and then average the temperatures within those groupings. This is quite simple with the Streams interface recently added to the JRE. We can utilize the collect method along with the new Collectors class to do both of these operations. This is what that would look like:

public void computeCountryAverages() {
   Map<String, Double> averages = cache.entrySet().stream()
           .collect(Collectors.groupingBy(e -> e.getValue().country,
                   Collectors.averagingDouble(e -> e.getValue().temperature)));
}
As you can see we are first grouping the elements by the country and then finally averaging the weather within those countries, just like we wanted! And it requires very little code.

Unfortunately the above code doesn't work for a distributed cache like we have. The reason for this is we have to transport the collector operations to each node so they can be processed there. Thus we have to make sure these are able to be serialized. Unfortunately the Collectors class doesn't produce instances that are serializable by default. Therefore Infinispan has a bridge class to support using Collectors, which is aptly named CacheCollectors which allows for the user to provide a serializable Supplier of a Collector. This can be used like the following:

public void computeCountryAverages() {
   Map<String, Double> averages = cache.entrySet().stream()
           .collect(CacheCollectors(() -> Collectors.groupingBy(e -> e.getValue().country,
                   Collectors.averagingDouble(e -> e.getValue().temperature))));
}

The above code will serialize the supplier of the collector and send that across to the other nodes where it can then create the non serializable Collector locally. This allows for the operations to be executed locally to each entry.

That's it ! Let's see it at work

$ git checkout -f step-10
$ mvn clean package exec:exec # on terminal 1
$ mvn exec:exec # on terminal 2

Here is the trimmed output:

Coordinator Node Output
---- Average country temperatures ----
Average temperature in Canada is -11.0° C
Average temperature in USA is 5.8° C
Average temperature in Romania is 7.0° C
Average temperature in UK is 3.0° C
Average temperature in Italy is 5.5° C
Average temperature in Portugal is 13.6° C
Average temperature in Switzerland is -0.1° C

Now that we have a fully working application, which actually does something useful, we might want to run it in production, so we need to find a way to make changing configuration according to the environment a bit simpler. On to the next step.

back to top