Implementing an AsyncEventListener

You implement an AsyncEventListener by defining a class that implements the AsynchEventListener interface, installing the AsynchEventListener configuration, and associating a table with it.

The example procedures show how to configure an AsyncEventListener with redundancy 1 in a SQLFire system having one peer-client member and three data store members. Two data stores "DataStore1" and "DataStore2" are available in server group "SG1." The AsyncEventListener will be installed on only those members that belong to server group 'SG1'. A single table, "TESTTABLE" is associated with the AsyncEventListener.

When testing your own AsyncEventListener implementation you will also need to stop the listener configuration. Stop or Remove an AsyncEventListener Configuration describes this process.

Implement the AsyncEventListener Interface

Define a class that implements the http://www.vmware.com/support/developer/vfabric-sqlfire/103-api/com/vmware/sqlfire/callbacks/AsyncEventListener.html interface. This example uses TestAsyncEventListener.java.
package user.application;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.sql.ResultSetMetaData;
import java.util.Iterator;
import java.util.List;

import com.vmware.sqlfire.callbacks.AsyncEventListener;
import com.vmware.sqlfire.callbacks.Event;

public class TestAsyncEvenListener implements AsyncEventListener
{

  private FileOutputStream fos;

  public void close()
  {
    try {
      this.fos.close();
    }
    catch (IOException e) {
      //handle exception
    }
  }

  public void init(String filePath)
  {
    try {
      this.fos = new FileOutputStream(filePath);
    }
    catch (FileNotFoundException e) {
      //handle exception
    }
  }

  public boolean processEvents(List<Event> events)
  {
    try {
      Iterator<Event> itr = events.iterator();
      Event.Type evType = null;
      for (Event event : events) {
        ResultSetMetaData rsmd = event.getResultSetMetaData();
        String schemaName = rsmd.getSchemaName(1 /*from column  # 1*/);
        String tableName = rsmd.getTableName(1 /*from column  # 1*/);
        evType = event.getType();
        //The primary key could be either a single  value for single column 
        // primary key or could be  an Object[] in case of Composite primary key.
        //For tables without primary key defined, the value returned is a Long 
        // which uniquely identifies the row. User can map rows created/updated/deleted
        // using this value.
        Object pk = event.getPrimaryKey();
        switch (evType) {
        case AFTER_INSERT:
          List<Object> rowInserted = event.getNewRow();
          
          int i =1;
          for (Object colVal : rowInserted) {
             String columnName = rsmd.getColumnName(i);
             // use the column name & value given by colVal to process the field
          }
          //process the row , may be persist to file.
          break;
        case AFTER_UPDATE:
          List<Object> rowUpdated = event.getNewRow();
          //Modified column's column position is given by the below API.
          //The unmodified cols are null 
          List<Integer> modifiedCols = event.getModifiedColumns();
          for (int colNum : modifiedCols) {
            //The column number which is modified is given by colNum.
            //Obtain info about modified column
            String colName = rsmd.getColumnName(colNum);
            //Obtain the modified value of the column
            //Notice that the value is obtained at position colNum -1, as the 
            //List containing values is zero based while column position in table
            // is 1 based.
            Object newColNalue = rowUpdated.get(colNum - 1);
            //process the information using the column name , table name, schema name 
            //& modified value.              
          }

          break;
        case AFTER_DELETE:
             //Utilize the pk , primary key for processing the deleted row.
          break;
        }
      }
    }
    catch (Exception e) {
      //handle exception
      return false;
    }
    return true;
  }
}

Install the AsyncEventListener Configuration

Install the AsyncEventListener implementation by executing the CREATE ASYNCEVENTLISTENER statement on any member of the SQLFire system. You must specify the server groups on which to deploy the implementation, as well as a unique identification and class name for the implementation.

For example:

CREATE ASYNCEVENTLISTENER MyListener
(
   LISTENERCLASSNAME 'user.application.TestAsyncEventListener'
   INITPARAMS ''
)  SERVER GROUPS ( SG1 );
Note: You must include the INITPARAMS argument, even if it defines no parameters.
Note: You can optionally install the AsyncEventListener configuration after you associate a table with the listener name. Make sure that you use the same listener name with both the CREATE ASYNCEVENTLISTNER command and the CREATE TABLE command.

This example installs TestAsyncEventListener with the identifier "MyListener" on members of the server group "SG1." See CREATE ASYNCEVENTLISTENER for more information about other parameters to the procedure.

Start the AsyncEventListener Configuration

After you install the AsyncEventListener implementation, you can start it by specifying the unique identifier for the implementation in the SYS.START_ASYNC_EVENT_LISTENER procedure. For example:

java.sql.Connection conn = getConnection();    
    CallableStatement cs = conn.prepareCall("call SYS.START_ASYNC_EVENT_LISTENER( ? )");
    cs.setString(1, "MyID");
    cs.execute();

On execution, a thread would service the queue of the AsyncEventListener and dispatch events to the callback class.

Associate a Table with an AsyncEventListener Configuration

You associate a table with an AsyncEventListener configuration using the AsyncEventListener clause in the table's CREATE TABLE statement. For example, to associate the table "TestTable" with an AsyncEventListenerConfiguration identified by "MyID" use the statement:

 java.sql.Connection conn = getConnection();    
 Statement stmt = conn.createStatement();
 stmt.execute("CREATE TABLE TESTTABLE (ID int NOT NULL, DESCRIPTION varchar(1024),
 ADDRESS varchar(1024) ) AsyncEventListener(MyID) " ); 
Note: Because the associated listener configuration does not have to be available at the time you create the table, SQLFire does not display an error message if the specified listener name does not yet exist. Make sure that you use the same listener name with both the CREATE ASYNCEVENTLISTNER command and the CREATE TABLE command.
Note: You can optionally associate the table with an AsyncEventListener configuration after you create the table, using the SQLFire ALTER TABLE statement.

Display Installed AsyncEventListeners

You can join the SYSTABLES, MEMBERS, and ASYNCEVENTLISTENERS tables to display the listener implementations associated with a table as well as the data store ID on which the listener is installed:
select t.*, m.ID DSID from SYS.SYSTABLES t, SYS.MEMBERS m, SYS.ASYNCEVENTLISTENERS a
       where t.tablename='<table>' and groupsintersect(a.SERVER_GROUP, m.SERVERGROUPS)
       and groupsintersect(t.ASYNCLISTENERS, a.ID);

See SYSTABLES and ASYNCEVENTLISTENERS.

Stop or Remove an AsyncEventListener Configuration

To stop a running AsyncEventListener configuration, use the SYS.STOP_ASYNC_EVENT_LISTENER procedure and specify the identifier to the configuration to stop:

    java.sql.Connection conn = getConnection();    
    CallableStatement cs = conn.prepareCall("call SYS.STOP_ASYNC_EVENT_LISTENER( ? )");
    cs.setString(1, "MyID");
    cs.execute();

This stops dispatching events to the callback listener class. Also, the close() method of the Callback AsyncEventListener class is invoked so that the implementation can perform any necessary cleanup.

You can also remove a listener implementation from the cluster using DROP ASYNCEVENTLISTENER.