Don't touch my data stream!

In Infinispan 8.0 we were very excited to announce Distributed Streams as we moved to Java 8. This feature allows applying any of the various operations on the datagrid, which are performed in a distributed nature, providing the highest possible performance as data is processed on the node where it lives, only requiring the terminal operation intermediate results to be returned to the invoker.

One problem with distributed streams though is that data is processed without acquiring locks: great for performance, but there is no guarantee that some other process isn’t concurrently modifying the cache entry you’re working on. Consider the following example which iterates through the entire contents of a cache, modifying each entry based on its existing value:

This works great until you have another cache put() running concurrently that changes a value. In this case the only way to be sure that an update is applied properly is to perform an optional update in the forEach. In a transactional cache you could also lock the entry manually (pessimistic) or retry on a WriteSkewException (optimistic). For example this is how the optional update could be performed.

As you can see the code isn’t as pretty as it was before, but is still pretty concise.

Infinispan 9.1 introduces locked streams, which allow you to run your operation knowing that another update cannot be performed while running the Consumer. Note this only works in non transactional and pessimistic transactional caches (optimistic transactional caches are not supported).

If you notice the code looks very similar to the first example. You just have to invoke the lockedStream method on the AdvancedCache and then you can use the stream knowing that data for the given key won’t change while performing your update.

This locked stream has a slightly limited API compared to the normal API. Only the filter method is allowed in addition to forEach. The CacheStream API is also supported, with a few exceptions. For more details on the API and what methods are supported you should check out the Javadoc.

The lock is only acquired for the given key while invoking the Consumer, allowing other updates on other keys to be performed concurrently, just like a normal put operation would behave. It is not suggested to perform operations on other keys in the Consumer, as this could cause possible deadlocks.

Now go forth and perform operations using the data stream knowing that the data underneath has not changed!



JUGs alpha as7 asymmetric clusters asynchronous beta c++ cdi chat clustering community conference configuration console data grids data-as-a-service database devoxx distributed executors docker event functional grouping and aggregation hotrod infinispan java 8 jboss cache jcache jclouds jcp jdg jpa judcon kubernetes listeners meetup minor release off-heap openshift performance presentations product protostream radargun radegast recruit release release 8.2 9.0 final release candidate remote query replication queue rest query security spring streams transactions vert.x workshop 8.1.0 API DSL Hibernate-Search Ickle Infinispan Query JP-QL JSON JUGs JavaOne LGPL License NoSQL Open Source Protobuf SCM administration affinity algorithms alpha amazon anchored keys annotations announcement archetype archetypes as5 as7 asl2 asynchronous atomic maps atomic objects availability aws beer benchmark benchmarks berkeleydb beta beta release blogger book breizh camp buddy replication bugfix c# c++ c3p0 cache benchmark framework cache store cache stores cachestore cassandra cdi cep certification cli cloud storage clustered cache configuration clustered counters clustered locks codemotion codename colocation command line interface community comparison compose concurrency conference conferences configuration console counter cpp-client cpu creative cross site replication csharp custom commands daas data container data entry data grids data structures data-as-a-service deadlock detection demo deployment dev-preview development devnation devoxx distributed executors distributed queries distribution docker documentation domain mode dotnet-client dzone refcard ec2 ehcache embedded embedded query equivalence event eviction example externalizers failover faq final fine grained flags flink full-text functional future garbage collection geecon getAll gigaspaces git github gke google graalvm greach conf gsoc hackergarten hadoop hbase health hibernate hibernate ogm hibernate search hot rod hotrod hql http/2 ide index indexing india infinispan infinispan 8 infoq internationalization interoperability interview introduction iteration javascript jboss as 5 jboss asylum jboss cache jbossworld jbug jcache jclouds jcp jdbc jdg jgroups jopr jpa js-client jsr 107 jsr 347 jta judcon kafka kubernetes lambda language learning leveldb license listeners loader local mode lock striping locking logging lucene mac management map reduce marshalling maven memcached memory migration minikube minishift minor release modules mongodb monitoring multi-tenancy nashorn native near caching netty node.js nodejs non-blocking nosqlunit off-heap openshift operator oracle osgi overhead paas paid support partition handling partitioning performance persistence podcast presentation presentations protostream public speaking push api putAll python quarkus query quick start radargun radegast react reactive red hat redis rehashing releaase release release candidate remote remote events remote query replication rest rest query roadmap rocksdb ruby s3 scattered cache scripting second level cache provider security segmented server shell site snowcamp spark split brain spring spring boot spring-session stable standards state transfer statistics storage store store by reference store by value streams substratevm synchronization syntax highlighting tdc testing tomcat transactions tutorial uneven load user groups user guide vagrant versioning vert.x video videos virtual nodes vote voxxed voxxed days milano wallpaper websocket websockets wildfly workshop xsd xsite yarn zulip
Posted by Unknown on 2017-07-03
Tags: streams
back to top