Continuous Queries

What You Will Learn

How to register a continuous query that monitors cached data in real time and triggers a callback whenever an entry matching your query is created or updated.

Prerequisites

  • Java 17+

  • An Infinispan Server running on localhost:11222 (or Docker/Podman available for Testcontainers)

Start an Infinispan Server with Docker or Podman:

docker run -it --rm -p 11222:11222 -e USER=admin -e PASS=password quay.io/infinispan/server:latest
Tip
You can replace docker with podman in the command above if you use Podman.
Tip
If no server is running, the tutorial code automatically starts an Infinispan Server using Testcontainers.

Step 1: Define the Data Model

Create a Protobuf entity using the @Proto annotation and register the schema with a @ProtoSchema interface:

@Proto
public record InstaPost(String id, String user, String hashtag){}

Then define a @ProtoSchema interface to generate the schema:

@ProtoSchema(schemaFileName = "instapost.proto",
      schemaPackageName = "tutorial",
      includeClasses = InstaPost.class)
public interface InstaSchema extends GeneratedSchema {
}

Add the schema initializer to the client configuration and register it on the server when connecting.

Step 2: Create a Continuous Query

Define an Ickle query and attach a ContinuousQueryListener that fires when matching entries arrive:

      RemoteCache<String, InstaPost> cache = client.getCache(TutorialsConnectorHelper.TUTORIAL_CACHE_NAME);

      // Create a query with lastName parameter
      Query<InstaPost> query = cache.query("FROM tutorial.InstaPost p where p.user = :userName");

      // Set the parameter value
      query.setParameter("userName", "belen_esteban");

      // Create the continuous query
      ContinuousQuery<String, InstaPost> continuousQuery = cache.continuousQuery();

      // Create the continuous query listener.
      listener =
              new ContinuousQueryListener<>() {
                 // This method will be executed every time new items that correspond with the query arrive
                 @Override
                 public void resultJoining(String key, InstaPost post) {
                    System.out.println(String.format("@%s has posted again! Hashtag: #%s", post.user(), post.hashtag()));
                    queryPosts.add(post);
                 }
              };

      // And the listener corresponding the query to the continuous query
      continuousQuery.addContinuousQueryListener(query, listener);

Step 3: Insert Data and Observe Notifications

As entries are added to the cache, the listener fires for every entry that matches the query:

      // Add 1000 random posts
      for (int i = 0; i < size; i++) {
         // Add a post
         addPost(cache, random);

         // Await a little to see results
         Thread.sleep(10);
      }

Remember to remove the listener when it is no longer needed to avoid memory leaks by calling continuousQuery.removeContinuousQueryListener(listener).

Step 4: Run the Tutorial

mvn package exec:java

You should see output showing notifications every time the matching user posts, followed by totals.

What’s Next