Note: Documentation for Pivotal GemFire 7.0.x is now available at http://gemfire.docs.pivotal.io/7.0.2/index.html. Please refer to the Pivotal site for the latest and most up-to-date documentation on GemFire 7.0.x. The vFabric GemFire 7.0 documentation site will no longer be updated.

Writing the CQ Listener or CQ Status Listener

The following C++ and C# .NET examples show how you might program a simple CqListener or CqStatusListener to update a display screen based on the CQ events it receives.

The listener retrieves the queryOperation and entry key and value from the CqEvent , then updates the screen according to the operation type provided in queryOperation.

CQ events do not change your client cache. They are provided as an event service only. This allows you to have any collection of CQs without storing large amounts of data in your regions. If you need to persist information from CQ events, program your listener to store the information where it makes the most sense for your application.

Be very careful if you choose to update your cache from your CqListener. If your listener updates the region that is queried in its own CQ, the update may be forwarded to the server. If the update on the server satisfies the same CQ, it may be returned to the same listener that did the update, which could put your application into an infinite loop. This same scenario could be played out with multiple regions and multiple CQs if the listeners are programmed to update each other's regions.

CqListener Implementation (C++)

// CqListener class
class TradeEventListener : public CqListener {
   public:
  void onEvent(const CqEvent& cqEvent) {
    // Operation associated with the query op
    CqOperation::CqOperationType queryOperation = cqEvent.getQueryOperation();
    // key and new value from the event
    CacheableKeyPtr key = cqEvent.getKey();
    TradeOrderPtr tradeOrder = dynCast<TradeOrderPtr>(cqEvent.getNewValue());
    if (queryOperation==CqOperation::OP_TYPE_UPDATE) {
      // update data on the screen for the trade order
      . . .
    }
    else if (queryOperation==CqOperation::OP_TYPE_CREATE) {
     // add the trade order to the screen
      . . .
    }
    else if (queryOperation==CqOperation::OP_TYPE_DESTROY) {
    // remove the trade order from the screen
      . . .
   }
 }
 void onError(const CqEvent& cqEvent) {
  // handle the error
 }
 void close() {
  // close the output screen for the trades
  . . .
 }
}

CqListener Implementation (C# .NET)

// CqListener class
public class TradeEventListener : ICqListener {
  public void onEvent(CqEvent cqEvent) {
     // Operation associated with the query op
     CqOperationType queryOperation = cqEvent.getQueryOperation();
     // key and new value from the event
     ICacheableKey key = cqEvent.getKey();
     CacheableString keyStr = key as CacheableString;
     IGFSerializable val = cqEvent.getNewValue();
     TradeOrder tradeOrder = val as TradeOrder;
     if (queryOperation==CqOperationType.OP_TYPE_UPDATE) {
        // update data on the screen for the trade order
        // . . .
        }
     else if (queryOperation== CqOperationType.OP_TYPE_CREATE) {
        // add the trade order to the screen
        // . . .
     }
     else if (queryOperation== CqOperationType.OP_TYPE_DESTROY) {
        // remove the trade order from the screen
        // . . .
     }
   }
   public void onError(CqEvent cqEvent) {
       // handle the error
   }
   // From CacheCallback
   public void close() {
      // close the output screen for the trades  
      // . . .
   }
} 

Writing a CqStatusListener

If you need your CQs to detect whether they are connected to any of the servers that host its subscription queues, implement a CqStatusListener instead of a CqListener.

CqStatusListener extends the current CqListener, allowing a client to detect when a CQ is connected and/or disconnected from the server(s). The onCqConnected() method will be invoked when the CQ is connected, and when the CQ has been reconnected after being disconnected. The onCqDisconnected() method will be invoked when the CQ is no longer connected to any servers.

Taking the examples from above, we can instead implement a CqStatusListener.

When you install the CqStatusListener, your listener will be able to detect its connection status to the servers that it is querying.

CqStatusListener Implementation (C++)

class MyCqStatusListener : public CqStatusListener {
  uint8_t m_id;
  uint32_t m_numInserts;
  uint32_t m_numUpdates;
  uint32_t m_numDeletes;
  uint32_t m_numEvents;
  uint32_t m_cqsConnectedCount;
  uint32_t m_cqsDisconnectedCount;

  public:
  uint8_t getId()
  {
    return m_id;
  }
  uint32_t getNumInserts()
  {
    return m_numInserts;
  }
  uint32_t getNumUpdates()
  {
    return m_numUpdates;
  }
  uint32_t getNumDeletes()
  {
    return m_numDeletes;
  }
  uint32_t getNumEvents()
  {
    return m_numEvents;
  }
  uint32_t getCqsConnectedCount()
  {
    return m_cqsConnectedCount;
  }
  uint32_t getCqsDisConnectedCount()
  {
    return m_cqsDisconnectedCount;
  }
  MyCqStatusListener(uint8_t id):
    m_id(id),
    m_numInserts(0),
    m_numUpdates(0),
    m_numDeletes(0),
    m_numEvents(0),
    m_cqsDisconnectedCount(0),
    m_cqsConnectedCount(0)
  {
  }
  inline void updateCount(const CqEvent& cqEvent)
  {
    m_numEvents++;
    switch (cqEvent.getQueryOperation())
    {
      case CqOperation::OP_TYPE_CREATE: {
        m_numInserts++;
        break;
        }
      case CqOperation::OP_TYPE_UPDATE: {
        m_numUpdates++;
        break;
        }
      case CqOperation::OP_TYPE_DESTROY: {
        m_numDeletes++;
        break;
        }
      default:
        break;
       }
  }
  void onEvent(const CqEvent& cqe){
    updateCount(cqe);
  }
  void onError(const CqEvent& cqe){
    updateCount(cqe);
  }
  void close(){
  }
  void onCqDisconnected() {
    //This is called when the cq loses connection with all servers.
    m_cqsDisconnectedCount++;
  }
  void onCqConnected() {
    //This is called when the cq establishes a connection with a server.
    m_cqsConnectedCount++;
  }
  void clear() {
    m_numInserts = 0;
    m_numUpdates = 0;
    m_numDeletes = 0;
    m_numEvents = 0;
    m_cqsDisconnectedCount = 0;
    m_cqsConnectedCount = 0;
  }
};

CQStatusListener Implementation (C# .NET)

  public class MyCqStatusListener<TKey, TResult> : ICqStatusListener<TKey, TResult>
  {
    #region Private members
    private bool m_failedOver = false;
    private UInt32 m_eventCountBefore = 0;
    private UInt32 m_errorCountBefore = 0;
    private UInt32 m_eventCountAfter = 0;
    private UInt32 m_errorCountAfter = 0;
    private UInt32 m_CqConnectedCount = 0;
    private UInt32 m_CqDisConnectedCount = 0;
    #endregion

    #region Public accessors
    public MyCqStatusListener(int id)
    {
    }
    public void failedOver()
    {
      m_failedOver = true;
    }
    public UInt32 getEventCountBefore()
    {
      return m_eventCountBefore;
    }
    public UInt32 getErrorCountBefore()
    {
      return m_errorCountBefore;
    }
    public UInt32 getEventCountAfter()
    {
      return m_eventCountAfter;
    }
    public UInt32 getErrorCountAfter()
    {
      return m_errorCountAfter;
    }
    public UInt32 getCqConnectedCount()
    {
      return m_CqConnectedCount;
    }
    public UInt32 getCqDisConnectedCount()
    {
      return m_CqDisConnectedCount;
    }
    #endregion

    public virtual void OnEvent(CqEvent<TKey, TResult> ev)
    {
      if (m_failedOver == true)
        m_eventCountAfter++;
      else
        m_eventCountBefore++;      
    }
    public virtual void OnError(CqEvent<TKey, TResult> ev)
    {
      if (m_failedOver == true)
        m_errorCountAfter++;
      else
        m_errorCountBefore++;
    }
    public virtual void Close()
    {
    }
    public virtual void OnCqConnected()
    {
      m_CqConnectedCount++;      
    }
    public virtual void OnCqDisconnected()
    {
      m_CqDisConnectedCount++;      
    }
    public virtual void Clear()
    {
      m_eventCountBefore = 0;
      m_errorCountBefore = 0;
      m_eventCountAfter = 0;
      m_errorCountAfter = 0;
      m_CqConnectedCount = 0;
      m_CqDisConnectedCount = 0;
    }
  }