Use continuous
querying in your clients to receive continuous updates to queries run on the
servers. You can use a CQ in any of your client regions.
-
Configure the client pools you will use for CQs with
subscription-enabled set to true.
To have CQ and interest subscription events arrive as closely
together as possible, use a single pool for everything. Different pools might
use different servers, which can lead to greater differences in event delivery
time.
-
Write your OQL query to retrieve the data you need from the
server.
The query must satisfy these CQ requirements in addition to the
standard
GemFire
querying specifications:
- The FROM clause must
contain only a single region specification, with optional iterator variable.
- The query must be a
SELECT expression only, preceded by zero or more IMPORT statements. This means
the query cannot be a statement like "/tradeOrder.name" or "(SELECT * from
/tradeOrder).size".
- The CQ query cannot
use:
- Cross region joins
- Drill-downs into
nested collections
- DISTINCT
- Projections
- Bind parameters
This is the basic syntax for the CQ query:
SELECT * FROM /fullRegionPath [iterator] [WHERE clause]
This example query could be used to get all trade orders where the
price is over $100:
SELECT * FROM /tradeOrder t WHERE t.price > 100.00
-
Write your CQ listeners to handle CQ events from the server.
Implement
com.gemstone.gemfire.cache.query.CqListener in
each event handler you need. In addition to your main CQ listeners, you might
have listeners that you use for all CQs to track statistics or other general
information.
Note: Be especially careful if you choose to update your cache from
your
CqListener. If your listener updates the region
that is queried in its own CQ and that region has a
Pool named, the update will be forwarded to the
server. If the update on the server satisfies the same CQ, it may be returned
to the same listener that did the update, which could put your application into
an infinite loop. This same scenario could be played out with multiple regions
and multiple CQs, if the listeners are programmed to update each other's
regions.
This example outlines a
CqListener that might be used to update a display
screen with current data from the server. The listener gets the
queryOperation and entry key and value from the
CqEvent and then updates the screen according to
the type of
queryOperation.
// CqListener class
public class TradeEventListener implements CqListener
{
public void onEvent(CqEvent cqEvent)
{
// com.gemstone.gemfire.cache Operation associated with the query op
Operation queryOperation = cqEvent.getQueryOperation();
// key and new value from the event
Object key = cqEvent.getKey();
TradeOrder tradeOrder = (TradeOrder)cqEvent.getNewValue();
if (queryOperation.isUpdate())
{
// update data on the screen for the trade order . . .
}
else if (queryOperation.isCreate())
{
// add the trade order to the screen . . .
}
else if (queryOperation.isDestroy())
{
// remove the trade order from the screen . . .
}
}
public void onError(CqEvent cqEvent)
{
// handle the error
}
// From CacheCallback public void close()
{
// close the output screen for the trades . . .
}
}
When you install the listener and run the query, your
listener will handle all of the CQ results.
-
If you need your CQs to detect whether they are connected to any
of the servers that host its subscription queues, implement a
CqStatusListener instead of a
CqListener.
CqStatusListener extends the current
CqListener, allowing a client to detect when a CQ
is connected and/or disconnected from the server(s). The
onCqConnected() method will be invoked when the CQ
is connected, and when the CQ has been reconnected after being disconnected.
The
onCqDisconnected() method will be invoked when the
CQ is no longer connected to any servers.
Taking the example from step 3, we can instead implement a
CqStatusListener:
public class TradeEventListener implements CqStatusListener
{
public void onEvent(CqEvent cqEvent)
{
// com.gemstone.gemfire.cache Operation associated with the query op
Operation queryOperation = cqEvent.getQueryOperation();
// key and new value from the event
Object key = cqEvent.getKey();
TradeOrder tradeOrder = (TradeOrder)cqEvent.getNewValue();
if (queryOperation.isUpdate())
{
// update data on the screen for the trade order . . .
}
else if (queryOperation.isCreate())
{
// add the trade order to the screen . . .
}
else if (queryOperation.isDestroy())
{
// remove the trade order from the screen . . .
}
}
public void onError(CqEvent cqEvent)
{
// handle the error
}
// From CacheCallback public void close()
{
// close the output screen for the trades . . .
}
public void onCqConnected() {
//Display connected symbol
}
public void onCqDisconnected() {
//Display disconnected symbol
}
}
When you install the
CqStatusListener, your listener will be able to
detect its connection status to the servers that it is querying.
-
Program your client to run the CQ:
-
Create a
CqAttributesFactory and use it to set your
CqListeners and
CqStatusListener.
-
Pass the attributes factory and the CQ query and its unique
name to the
QueryService to create a new
CqQuery.
-
Start the query running by calling one of the execute methods
on the
CqQuery object.
You can execute with or without an initial result set.
-
When you are done with the CQ, close it.
Continuous Query Implementation
// Get cache and queryService - refs to local cache and QueryService
// Create client /tradeOrder region configured to talk to the server
// Create CqAttribute using CqAttributeFactory
CqAttributesFactory cqf = new CqAttributesFactory();
// Create a listener and add it to the CQ attributes callback defined below
CqListener tradeEventListener = new TradeEventListener();
cqf.addCqListener(tradeEventListener);
CqAttributes cqa = cqf.create();
// Name of the CQ and its query
String cqName = "priceTracker";
String queryStr = "SELECT * FROM /tradeOrder t where t.price > 100.00";
// Create the CqQuery
CqQuery priceTracker = queryService.newCq(cqName, queryStr, cqa);
try
{ // Execute CQ, getting the optional initial result set
// Without the initial result set, the call is priceTracker.execute();
SelectResults sResults = priceTracker.executeWithInitialResults();
List list1 = sResults.asList();
for (int i=0; i < list1.size(); i++)
{
MyValueObject obj = (MyValueObject)list1.get(i);
// ... do something with the entry key and value
}
} catch (Exception ex)
{
ex.printStackTrace();
}
// Now the CQ is running on the server, sending CqEvents to the listener
. . .
// End of life for the CQ - clear up resources by closing
priceTracker.close();
With continuous queries, you
can optionally implement:
- Highly available CQs by
configuring your servers for high availability.
- Durable CQs by
configuring your clients for durable messaging and indicating which CQs are
durable at creation.