Implementing an AsyncEventListener for Write-Behind Cache Event Handling

An AsyncEventListener receives callbacks for events that change region data. You can use an AsyncEventListener implementation as a write-behind cache event handler to synchronize region updates with a database.

How an AsyncEventListener Works

An AsyncEventListener instance is serviced by its own dedicated thread in which a callback method is invoked. Events that update a region are placed in an internal AsyncEventQueue, and the dedicated thread dispatches a batch of events at a time to the listener implementation.

You can configure an AsyncEventQueue to be either serial or parallel. A serial queue is deployed to one GemFire member, and it delivers all of a region's events in order to a configured AsyncEventListener implementation. A parallel queue is deployed to multiple GemFire members, and each instance of the queue simultaneously delivers region events to a local AsyncEventListener implementation.

While a parallel queue provides the best throughput for writing events, it provides less control for ordering those events. With a parallel queue, you cannot preserve event ordering for a region as a whole because multiple GemFire servers queue and deliver the region's events at the same time. However, the ordering of events for a given partition (or for a given queue of a distributed region) can be preserved.

For both serial and parallel queues, you can control the maximum amount of memory that each queue uses, as well as the batch size and frequency for processing batches in the queue. You can also configure queues to persist to disk (instead of simply overflowing to disk) so that write-behind caching can pick up where it left off when a member shuts down and is later restarted.

Optionally, a serial queue can use multiple threads to dispatch queued events. When you configure multiple threads for a serial queue, the logical queue that is hosted on a GemFire member is divided into multiple physical queues, each with a dedicated dispatcher thread. You can then configure whether the threads dispatch queued events by key, by thread, or in the same order in which events were added to the queue.

Note: You cannot configure multiple dispatcher threads or the ordering policy for a parallel event queue.

Guidelines for Using an AsyncEventListener

Review the following guidelines before using an AsyncEventListener:
  • If you use an AsyncEventListener to implement a write-behind cache listener, your code should check for the possibility that an existing database connection may have been closed due to an earlier exception. For example, check for Connection.isClosed() in a catch block and re-create the connection as needed before performing further operations.
  • Use a serial AsyncEventQueue if you need to preserve the order of region events when delivering events to your listener implementation. Use parallel queues when the order of events is not important, and when you require maximum throughput for processing events.
  • You must install the AsyncEventListener implementation on a GemFire member that hosts the region whose events you want to process.
  • If you configure a parallel AsyncEventQueue, deploy the queue on each GemFire member that hosts the region.
  • You can install a listener on more than one member to provide high availability and guarantee delivery for events, in the event that a member with the active AsyncEventListener shuts down. At any given time only one member has an active thread for dispatching events. The threads on other members remain on standby for redundancy.
  • Install no more than one standby listener (redundancy of at most one) for performance and memory reasons.
  • To preserve pending events through member shutdowns, configure GemFire to persist the internal queue of the AsyncEventListener to an available disk store. By default, any pending events that reside in the internal queue of an AsyncEventListener are lost if the active listener's member shuts down.
  • To ensure high availability and reliable delivery of events, configure the event queue to be both persistent and redundant.

Implementing an AsyncEventListener

To receive region events for processing, you create a class that implements the AsyncEventListener interface. The processEvents method in your listener receives a list of queued AsyncEvent objects in each batch.

Each AsyncEvent object contains information about a region event, such as the name of the region where the event occurred, the type of region operation, and the affected key and value.

The basic framework for implementing a write-behind event handler involves iterating through the batch of events and writing each event to a database. For example:
class MyAsyncEventListener implements AsyncEventListener {
	
  public void processEvents(List<AsyncEvent> events) {

	  // Process each AsyncEvent

	  for(AsyncEvent event: events) {

		  // Write the event to a database

	  }
	}
}

Processing AsyncEvents

When processing AsyncEvents, you can use the getDeserializedValue method to obtain cache values for entries that have been updated or created. Since the getDeserializedValue method will return a null value for destroyed entries, you should use the getKey method to obtain references to cache objects that have been destroyed. Here's an example of processing AsyncEvents:
public boolean processEvents(@SuppressWarnings("rawtypes") List<AsyncEvent> list)   
 {  
     logger.log (Level.INFO, String.format("Size of List<GatewayEvent> = %s", list.size()));  
     List<JdbcBatch> newEntries = new ArrayList<JdbcBatch>();  
       
     List<JdbcBatch> updatedEntries = new ArrayList<JdbcBatch>();  
     List<String> destroyedEntries = new ArrayList<String>();  
     int possibleDulicates = 0;  
       
     for (@SuppressWarnings("rawtypes") AsyncEvent ge: list)  
     {  
         
       if (ge.getPossibleDuplicate())  
        possibleDulicates++;  
          
       if ( ge.getOperation().equals(Operation.UPDATE))   
       {  
      updatedEntries.add((JdbcBatch) ge.getDeserializedValue());  
       }  
       else if ( ge.getOperation().equals(Operation.CREATE))  
       {  
         newEntries.add((JdbcBatch) ge.getDeserializedValue());  
       }  
       else if ( ge.getOperation().equals(Operation.DESTROY))  
       {  
      destroyedEntries.add(ge.getKey().toString());  
       }  
        
     }  

Configuring an AsyncEventListener

To configure a write-behind cache listener, you first configure an asynchronous queue to dispatch the region events, and then create the queue with your listener implementation. You then assign the queue to a region in order to process that region's events.

Procedure
  1. Configure a unique AsyncEventQueue with the name of your listener implementation. You can optionally configure the queue for parallel operation, persistence, batch size, and maximum memory size. See WAN Configuration for more information.
    cache.xml Configuration
    <cache>
       <async-event-queue id="sampleQueue" persistent="true"
        disk-store-name="exampleStore" parallel="false">
          <async-event-listener>
             <class-name>MyAsyncEventListener</class-name>
             <parameter name="url"> 
               <string>jdbc:db2:SAMPLE</string> 
             </parameter> 
             <parameter name="username"> 
               <string>gfeadmin</string> 
             </parameter> 
             <parameter name="password"> 
               <string>admin1</string> 
             </parameter> 
          </async-event-listener>
        </async-event-queue>
    ...
    </cache>
    Java Configuration
    Cache cache = new CacheFactory().create();
    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
    factory.setPersistent(true);
    factory.setDiskStoreName("exampleStore");
    factory.setParallel(false);
    AsyncEventListener listener = new MyAsyncEventListener();
    AsyncEventQueue asyncQueue = factory.create("customerWB", listener);
  2. If you are using a parallel AsyncEventQueue, repeat the above configuration in each GemFire member that will host the region. Use the same ID and configuration settings for each queue configuration.
  3. On each GemFire member that hosts the AsyncEventQueue, assign the queue to each region that you want to use with the AsyncEventListener implementation.

    cache.xml Configuration
    <cache>
    <region name="data">
    	   <region-attributes async-event-queue-ids="sampleQueue">
        </region-attributes>
      </region>
    ...
    </cache>
    Java Configuration
    RegionFactory rf1 = cache.createRegionFactory();
    rf1.addAsyncEventQueue(asyncQueue);
    Region customer = rf1.create("Customer");
        
    // Assign the queue to multiple regions as needed
    RegionFactory rf2 = cache.createRegionFactory();
    Region order = rf2.create("Order");
    Using the Java API, you can also add and remove queues to regions that have already been created:
    AttributesMutator mutator = order.getAttributesMutator();
        List<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
        for(AsyncEventQueue asyncQueue: asyncQueues) {
        	if(asyncQueue.getId().equals("customerWB")) {
        		mutator.addAsyncEventQueue(asyncQueue);		
        	}
        }

    See the GemFire API documentation for more information.

  4. Optionally configure persistence and conflation for the queue using the instructions in Configuring Event Queues.
  5. For serial queues only, optionally configure multiple dispatcher threads and the ordering policy for the queue using the instructions in Configuring Dispatcher Threads and Order Policy for Event Distribution.

The AsyncEventListener receives events from every region configured with the associated AsyncEventQueue.