Friday, 17 June 2011

So you want JPA-like access to Infinispan?

Back in the early days of Infinispan (since our first public announcement, in fact) we always had it in mind to expose a JPA-like layer to Infinispan.  Initially this was as a replacement to the fine-grained replication that JBoss Cache's POJO Cache variant offered, but it grew beyond just a technique to do fine-grained replication on complex object graphs.  The fact that it offered a familiar data storage API to Java developers was big.  Huge, in fact.

image

So we realised JPA-on-Infinispan was firmly on the roadmap.  The original plan was to implement the entire set of JPA APIs from scratch, but this was a daunting and Herculean task.  After much discussion with core Hibernate architects and Infinispan contributors Emmanuel Bernard and Sanne Grinovero, we came to a decision that rather than implementing all this from scratch, it served both Infinispan and the community better to fork Hibernate’s core ORM engine, and replace the relational database mappings with key/value store mappings.  And we get to reuse the mature codebase of Hibernate’s session and transaction management, object graph dehydration code, proxies, etc.

And Hibernate OGM (Object-Grid Mapping) was born.  After initial experiments and even a large-scale public demo at the JBoss World 2011 Keynote, Emmanuel has officially blogged about the launch of Hibernate OGM.  Very exciting times, Infinispan now has a JPA-like layer.  :-)

To reiterate a key point from Emmanuel’s blog, Hibernate OGM is still in its infancy.  It needs community participation to help it grow up and mature.  This is where the Infinispan community should step in; consider Hibernate OGM as Infinispan’s JPA-like layer and get involved.  For more details, please read Emmanuel’s announcement.

Enjoy! Manik

Posted by Manik Surtani on 2011-06-17
Tags: hibernate ogm jpa hibernate API

Wednesday, 19 January 2011

Introducing distributed execution and MapReduce framework

In case you did not pay attention to the area of large scale distributed computing – there is a revolution going on! It is becoming increasingly evident that the software ecosystems built around so called Big Data are at the forefront of cloud computing innovation. Unfortunately, there has been more debate around determining how big Big Data actually is rather than defining common set of requirements for the large scale Big Data computational platforms.

Stephen O’Grady of RedMonk summarized this phenomena succinctly: “Big Data, like NoSQL, has become a liability in most contexts. Setting aside the lack of a consistent definition, the term is of little utility because it is single-dimensional. Larger dataset sizes present unique computational challenges. But the structure, workload, accessibility and even location of the data may prove equally challenging.”

Zack Urlocker, an advisor and board member to several startup companies in the area of SaaS was equally vocal in his criticism regarding complexity of the existing systems : “You pretty much gotta be near genius level to build systems on top of Cassandra, Hadoop and the like today. These are powerful tools, but very low-level, equivalent to programming client server applications in assembly language. When it works its [sic] great, but the effort is significant and it’s probably beyond the scope of mainstream IT organizations.”

This is exactly where we are positioning Infinispan’s roadmap as we are announcing initial steps into the area of distributed execution and MapReduce framework built on top of Infinispan. Infinispan’s distributed data grid is a most natural fit for such a platform. We have already built an infrastructure for essentially unlimited linear in-memory data scaling. However, having such a data grid without an ability to execute large scale computation on it is like having a Ferrari without a drivers licence. Listening to the criticism regarding the lack of direction in Big Data field and complexity of the existing distributed execution frameworks our focus was primarily on simplicity without sacrificing power and a rich feature set such a framework should have.

Simple distributed execution model

The main interfaces for simple distributed task execution are DistributedCallable and DistributedExecutorService. DistributedCallable is essentially a version of the existing Callable from java.util.concurrent package except that DistributedCallable can be executed in remote JVM and receive input from Infinispan cache. Tasks' main algorithm is essentially unchanged, only the input source is changed. Exisiting Callable implementation most likely gets its input in a form of some Java object/primitive while DistributedCallable gets its input from Infinispan cache. Therefore, users who have already implemented Callable interface to describe their task units would simply extend DistributedCallable and use keys from Infinispan execution environment as input for the task. Implentation of DistributedCallable can in fact continue to support implementation of an already existing Callable while simultaneously be ready for distribited execution by extending DistributedCallable.

public interface DistributedCallable extends Callable {
/**
* Invoked by execution environment after DistributedCallable
* has been migrated for execution to
* a specific Infinispan node.
*
* @param cache
*           cache whose keys are used as input data for
* this DistributedCallable task
* @param inputKeys
*           keys used as input for this DistributedCallable task
*/
public void setEnvironment(Cache cache, Set inputKeys);
}

DistributedExecutorService is an simple extension of a familiar ExecutorService from java.util.concurrent package. However, the advantages of DistributedExecutorService are not to be overlooked. For the existing Callable tasks users would submit to ExecutorService there is an option to submit them for an execution on Infinispan cluster. Infinispan execution environment would migrate this task to an execution node, run the task and return the results to the calling node. Of course, not all Callable task would benefit from this feature. Excellent candidates are long running and computationally intensive tasks.

The second advantage of the DistributedExecutorService is that it allows a quick and simple implementation of tasks that take input from Infinispan cache nodes, execute certain computation and return results to the caller. Users would specify which keys to use as input for specified DistributedCallable and submit that callable for execution on Infinispan cluster. Infinispan runtime would locate the appriate keys, migrate DistributedCallable to target execution node(s) and finally return a list of results for each executed Callable. Of course, users can omit specifying input keys in which case Infinispan would execute DistributedCallable on all keys for a specified cache.

MapReduce model

Infinispan’s own MapReduce model is an adaptation of Google’s original MapReduce. There are four main components in each map reduce task: Mapper, Reducer, Collator and MapReduceTask.

Implementation of a Mapper class is a component of a MapReduceTask invoked once for each input entry K,V. Every Mapper instance migrated to an Infinispan node, given a cache entry K,V input pair transforms that input pair into a result T. Intermediate result T is further reduced using a Reducer.

public interface Mapper {

/**
* Invoked once for each input cache entry
* K,V transforms that input into a result T.
*
* @param key
*           the kay
* @param value
*           the value
* @return result T
*/
T map(K key, V value);

}

Reducer, as its name implies, reduces a list of results T from map phase of MapReduceTask. Infinispan distributed execution environment creates one instance of Reducer per execution node.

public interface Reducer {


/**
* Reduces a result T from map phase and return R.
* Assume that on Infinispan node N, an instance
* of Mapper was mapped and invoked on k many
* key/value pairs. Each T(i) in the list of all
* T's returned from map phase executed on
* Infinispan node N is passed to reducer along
* with previsouly computed R(i-1). Finally the last
* invocation of reducer on T(k), R is returned to a
* distributed task that originated map/reduce
* request.
*
* @param mapResult
*           result T of map phase
* @param previouslyReduced
*           previously accumulated reduced result
* @return result R
*
*/
R reduce(T mapResult, R previouslyReduced);

}

Collator coordinates results from Reducers executed on Infinispan cluster and assembles a final result returned to an invoker of MapReduceTask. #[.cm] #

public interface Collator {

/**
* Collates all results added so far and
* returns result R to invoker of distributed task.
*
* @return final result of distributed task computation
*/
R collate();

/**
* Invoked by runtime every time reduced result
* R is received from executed Reducer on remote
* nodes.
*
* @param remoteNode
*           address of the node where reduce phase occurred
* @param remoteResult
*           the result R of reduce phase
*/
void reducedResultReceived(Address remoteNode, R remoteResult);
}

[.cm]#Finally, MapReduceTask is a distributed task uniting Mapper, Reducer and Collator into a cohesive large scale computation to be transparently parallelized across Infinispan cluster nodes. Users of MapReduceTask need to provide a cache whose data is used as input for this task. Infinispan execution environment will instantiate and migrate instances of provided mappers and reducers seamlessly across Infinispan nodes. Unless otherwise specified using onKeys method input keys filter all available key value pairs of a specified cache will be used as input data for this task. #

[.cm]#MapReduceTask implements a slightly different execution model from the original MapReduce proposed by Google. Here is the pseudocode of the MapReduceTask. #

mapped = list()
for entry in cache.entries:
t = mapper.map(entry.key, entry.value)
mapped.add(t)

r = null
for t in mapped:
r = reducer.reduce(t, r)
return r to Infinispan node that invoked the task

On Infinispan node invoking this task:
reduced_results = invoke map reduce task on all nodes, retrieve map{address:result}
for r in reduced_results.entries:
remote_address = r.key
remote_reduced_result = r.value
collator.add(remote_address, remote_reduced_result)

return collator.collate()

Examples

In order to get a better feel for MapReduce framework lets have a look at the example related to Infinispan’s grid file system. How would we calculate total size of all files in the system using MapReduce framework? Easy! Have a look at GridFileSizeExample.

public class GridFileSizeExample {
 public static void main(String arg[]) throws Exception {

Cache  cache = null;
MapReduceTask task =

new MapReduceTask(cache);

Long result = task.mappedWith(new Mapper() {

@Override
public Long map(String key, GridFile.Metadata value) {
return (long) value.getLength();
}

}).reducedWith(new Reducer() {

@Override
public Long reduce(Long mapResult, Long previouslyReduced) {
return previouslyReduced == null ? mapResult : mapResult + previouslyReduced;
}

}).collate(new Collator(){

private Long result = 0L;

@Override
public Long collate() {
return result;
}

@Override
public void reducedResultReceived(Address remoteNode, Long remoteResult) {
result += remoteResult;
}});

System.out.println("Total filesystem size is " + result + " bytes");

}
}

In conclusion, this is not a perfect and final distributed execution and MapReduce API that can satisfy requirements of all users but it is a good start. As we push forward and make it more feature rich while keeping it simple we are continuously looking for your feedback. Together we can reach the ambitious goals set out in the beginning of this article.

Posted by Vladimir Blagojevic on 2011-01-19
Tags: distributed executors map reduce API

Wednesday, 13 May 2009

What's so cool about an asynchronous API?

Inspired by some thoughts from a recent conversation with JBoss Messaging’s Tim Fox, I’ve decided to go ahead and implement a new, asynchronous API for Infinispan.

To sum things up, this new API - additional methods on Cache - allow for asynchronous versions of put(), putIfAbsent(), putAll(), remove(), replace(), clear() and their various overloaded forms. Unimaginatively called putAsync(), putIfAbsentAsync(), etc., these new methods return a Future rather than the expected return type. E.g.,

V put(K key, V value);
Future<V> putAsync(K key, V value);

boolean remove(K key, V value);
Future<Boolean> removeAsync(K key, V value);

void clear();
Future<Void> clearAsync();

// ... etc ...

You guessed it, these methods do not block. They return immediately, and how cool is that! If you care about return values - or indeed simply want to wait until the operation completes - you do a Future.get(), which will block until the call completes. Why is this useful? Mainly because, in the case of clustered caches, it allows you to get the best of both worlds when it comes to synchronous and asynchronous mode transports.

Synchronous transports are normally recommended because of the guarantees they offer - the caller always knows that a call has properly propagated across the network, and is aware of any potential exceptions. However, asynchronous transports give you greater parallelism. You can start on the next operation even before the first one has made it across the network. But this is at a cost: losing out on the knowledge that a call has safely completed.

With this powerful new API though, you can have your cake and eat it too. Consider:

Cache<String, String> cache = getCache();
Future<String> f1 = cache.putAsync(k1, v1);
Future<String> f2 = cache.putAsync(k2, v2);
Future<String> f3 = cache.putAsync(k3, v3);

f1.get();
f2.get();
f3.get();

The network calls - possibly the most expensive part of a clustered write - involved for the 3 put calls can now happen in parallel. This is even more useful if the cache is distributed, and k1, k2 and k3 map to different nodes in the cluster - the processing required to handle the put operation on the remote nodes can happen simultaneously, on different nodes. And all the same, when calling Future.get(), we block until the calls have completed successfully. And we are aware of any exceptions thrown. With this approach, elapsed time taken to process all 3 puts should - theoretically, anyway - only be as slow as the single, slowest put().

This new API is now in Infinispan’s trunk and yours to enjoy. It will be a main feature of my next release, which should be out in a few days. Please do give it a spin - I’d love to hear your thoughts and experiences.

Posted by Manik Surtani on 2009-05-13
Tags: asynchronous future API

News

Tags

JUGs alpha as7 asymmetric clusters asynchronous beta c++ cdi chat clustering community conference configuration console data grids data-as-a-service database devoxx distributed executors docker event functional grouping and aggregation hotrod infinispan java 8 jboss cache jcache jclouds jcp jdg jpa judcon kubernetes listeners meetup minor release off-heap openshift performance presentations product protostream radargun radegast recruit release release 8.2 9.0 final release candidate remote query replication queue rest query security spring streams transactions vert.x workshop 8.1.0 API DSL Hibernate-Search Ickle Infinispan Query JP-QL JSON JUGs JavaOne LGPL License NoSQL Open Source Protobuf SCM administration affinity algorithms alpha amazon anchored keys annotations announcement archetype archetypes as5 as7 asl2 asynchronous atomic maps atomic objects availability aws beer benchmark benchmarks berkeleydb beta beta release blogger book breizh camp buddy replication bugfix c# c++ c3p0 cache benchmark framework cache store cache stores cachestore cassandra cdi cep certification cli cloud storage clustered cache configuration clustered counters clustered locks codemotion codename colocation command line interface community comparison compose concurrency conference conferences configuration console counter cpp-client cpu creative cross site replication csharp custom commands daas data container data entry data grids data structures data-as-a-service deadlock detection demo deployment dev-preview development devnation devoxx distributed executors distributed queries distribution docker documentation domain mode dotnet-client dzone refcard ec2 ehcache embedded embedded query equivalence event eviction example externalizers failover faq final fine grained flags flink full-text functional future garbage collection geecon getAll gigaspaces git github gke google graalvm greach conf gsoc hackergarten hadoop hbase health hibernate hibernate ogm hibernate search hot rod hotrod hql http/2 ide index indexing india infinispan infinispan 8 infoq internationalization interoperability interview introduction iteration javascript jboss as 5 jboss asylum jboss cache jbossworld jbug jcache jclouds jcp jdbc jdg jgroups jopr jpa js-client jsr 107 jsr 347 jta judcon kafka kubernetes lambda language learning leveldb license listeners loader local mode lock striping locking logging lucene mac management map reduce marshalling maven memcached memory migration minikube minishift minor release modules mongodb monitoring multi-tenancy nashorn native near caching netty node.js nodejs non-blocking nosqlunit off-heap openshift operator oracle osgi overhead paas paid support partition handling partitioning performance persistence podcast presentation presentations protostream public speaking push api putAll python quarkus query quick start radargun radegast react reactive red hat redis rehashing releaase release release candidate remote remote events remote query replication rest rest query roadmap rocksdb ruby s3 scattered cache scripting second level cache provider security segmented server shell site snowcamp spark split brain spring spring boot spring-session stable standards state transfer statistics storage store store by reference store by value streams substratevm synchronization syntax highlighting tdc testing tomcat transactions tutorial uneven load user groups user guide vagrant versioning vert.x video videos virtual nodes vote voxxed voxxed days milano wallpaper websocket websockets wildfly workshop xsd xsite yarn zulip

back to top