Continuous Queries

What You Will Learn

How to register a continuous query that monitors cached data in real time and triggers a callback whenever an entry matching your query is created or updated.

Prerequisites

  • Java 17+

  • An Infinispan Server running on localhost:11222 (or Docker/Podman available for Testcontainers)

Step 1: Define the Data Model

Create a Protobuf entity using the @Proto annotation and register the schema with a @ProtoSchema interface:

@Proto
public record InstaPost(String id, String user, String hashtag){}

Then define a @ProtoSchema interface to generate the schema:

@ProtoSchema(schemaFileName = "instapost.proto",
      schemaPackageName = "tutorial",
      includeClasses = InstaPost.class)
public interface InstaSchema extends GeneratedSchema {
}

Add the schema initializer to the client configuration and register it on the server when connecting.

Step 2: Create a Continuous Query

Define an Ickle query and attach a ContinuousQueryListener that fires when matching entries arrive:

      RemoteCache<String, InstaPost> cache = client.getCache(TutorialsConnectorHelper.TUTORIAL_CACHE_NAME);

      // Create a query with lastName parameter
      Query<InstaPost> query = cache.query("FROM tutorial.InstaPost p where p.user = :userName");

      // Set the parameter value
      query.setParameter("userName", "belen_esteban");

      // Create the continuous query
      ContinuousQuery<String, InstaPost> continuousQuery = cache.continuousQuery();

      // Create the continuous query listener.
      listener =
              new ContinuousQueryListener<>() {
                 // This method will be executed every time new items that correspond with the query arrive
                 @Override
                 public void resultJoining(String key, InstaPost post) {
                    System.out.println(String.format("@%s has posted again! Hashtag: #%s", post.user(), post.hashtag()));
                    queryPosts.add(post);
                 }
              };

      // And the listener corresponding the query to the continuous query
      continuousQuery.addContinuousQueryListener(query, listener);

Step 3: Insert Data and Observe Notifications

As entries are added to the cache, the listener fires for every entry that matches the query:

      // Add 1000 random posts
      for (int i = 0; i < size; i++) {
         // Add a post
         addPost(cache, random);

         // Await a little to see results
         Thread.sleep(10);
      }

Remember to remove the listener when it is no longer needed to avoid memory leaks by calling continuousQuery.removeContinuousQueryListener(listener).

Step 4: Run the Tutorial

mvn package exec:java

You should see output showing notifications every time the matching user posts, followed by totals.

What’s Next