Asynchronous queries with the Java driver
The DataStax Java driver for Cassandra uses an asynchronous architecture. This allows client code to get query results in a non-blocking way, via Future instances. In this post, we take a closer look at this concept, and use it to implement a client-side equivalent to the SELECT...IN query.
Asynchronous query result: ResultSetFuture
Here is a high-level overview of the execution of a client query with Session#executeAsync:
- Apart from sending the query to Cassandra, the driver registers an internal ResponseHandler, which will process the response when it is available. It then gives control back to the caller, returning a ResultSetFuture which represents the future completion of the query. This object implements Java's Future; at this point, its isDone method returns false.
- When Cassandra returns the response, the driver notifies the ResponseHandler (many handlers can be registered for different queries, so the match is made with the stream id, a unique identifier that was initially sent with the request). The handler will in turn complete the future. This is all executed in an I/O thread managed by Netty, the underlying networking framework.
- At some point, the client code will invoke the future's get method to obtain the result. This will block if the future has not yet completed.
You can observe this process with this (admittedly naive) code snippet:
ResultSetFuture future = session.executeAsync("SELECT release_version FROM system.local");
while (!future.isDone()) {
logger.debug("Waiting for request to complete");
}
ResultSet rs = future.get();
logger.debug("Got response: {}", rs.one().getString("release_version"));
On my laptop, the future takes about 4 milliseconds to complete, which gives the main thread time for a few iterations in the loop. Of course, that loop is for demonstration purposes only; you don't need it since the call to get is blocking. get also has a variant that waits for a given amount of time. If you decide to give up on the future after the timeout has elapsed, it's good practice to cancel it:
try {
ResultSet rs = future.get(5, TimeUnit.SECONDS);
... // do something with the results
} catch (TimeoutException e) {
future.cancel(true);
... // the query did not complete within 5 seconds, switch to plan B }
There are also non-blocking approaches, as we'll see in the next section.
Note: the driver is asynchronous by nature; synchronous methods like Session#execute are mere wrappers that call the asynchronous version, then immediately get the future's result (example).
A better future: ListenableFuture
Future is a nice abstraction, but it's a bit limited in its use: you can either check periodically if it's done, or wait for its result in a blocking manner. That's why the ResultSetFutures returned by the Java driver extend ListenableFuture. This interface is part of Google's Guava library; it's a specialized Future that allows the execution of callbacks upon completion.
To illustrate that, let's consider the task of updating a hypothetical GUI with the result of a query:
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
ResultSetFuture future = session.executeAsync("SELECT release_version FROM system.local"); Futures.addCallback(future,
new FutureCallback() { @Override public void onSuccess(ResultSet result) { gui.setMessage("Cassandra version is " + result.one().getString("release_version"));
}
@Override public void onFailure(Throwable t) {
gui.setMessage("Error while reading Cassandra version: " + t.getMessage());
}
},
MoreExecutors.sameThreadExecutor()
);
Note the last argument to addCallback: it is an executor responsible for providing the thread which will execute the callback. With sameThreadExecutor, this will be the client thread if the future has already completed by the time we register the callback, or the Netty I/O thread otherwise. This is fine if the callback is lightweight; for more compute-intensive tasks, consider providing your own executor to avoid blocking I/O threads for too long.
Guava provides several utility methods to work with ListenableFutures, most of which are exposed by the Futures class. In the next section, we're going to see an interesting use for two of them.
Case study: multi-partition query, a.k.a. "client-side SELECT...IN"
A common use-case with Cassandra is to retrieve the same data from various partitions. The most straightforward approach is to use a CQL SELECT...IN query:
CREATE TABLE IF NOT EXISTS users (id uuid PRIMARY KEY, name text);
SELECT * FROM users WHERE id IN (
e6af74a8-4711-4609-a94f-2cbfab9695e5,
281336f4-2a52-4535-847c-11a4d3682ec1,
c32b8d37-89bd-4dfe-a7d5-5f0258692d05
);
This is not necessarily optimal: this query will be sent to a coordinator node, which will then have to query replicas for each partition key. Considering that we have a smart token-aware driver, it would be more efficient to send an individual query for each partition key (SELECT * FROM users WHERE id = ?), which would reach the right replica directly. Then all that's left is to collate the results client-side.
Solution 1: return all the results as a list
First the easy part: given the query string and a list of partition keys, we execute the query for each partition key. This produces a list of futures:
private static List sendQueries(Session session, String query, Object[] partitionKeys) {
List futures = Lists.newArrayListWithExpectedSize(partitionKeys.length);
for (Object partitionKey : partitionKeys)
futures.add(session.executeAsync(query, partitionKey)); return futures;
}
Now we use Guava's successfulAsList to transform the List<Future<ResultSet>> into a Future<List<ResultSet>> (slightly pedantic side note: in functional programming terms, this is similar to the sequence operation on a traversable functor):
public static Future queryAllAsList(Session session, String query, Object... partitionKeys) { List futures = sendQueries(session, query, partitionKeys); return Futures.successfulAsList(futures); } [/code] The client gets a single future containing the list of results: [code gutter="true" language="java"] Future future = ResultSets.queryAllAsList(session, "SELECT * FROM users WHERE id = ?", UUID.fromString("e6af74a8-4711-4609-a94f-2cbfab9695e5"), UUID.fromString("281336f4-2a52-4535-847c-11a4d3682ec1") //... ); for (ResultSet rs : future.get()) { ... // process the result set } [/code] There is one drawback: the compound future only completes after the slowest response has arrived. The client won't have access to any of the results before that. It would be valuable to get the results as they become available, to start processing them right away, or stream them to a consumer. Let's see another approach to fix that.
Solution 2: return a list of futures in completion order
This is almost similar, except that we use another utility method from Guava:
public static List queryAll(Session session, String query, Object... partitionKeys) {
List futures = sendQueries(session, query, partitionKeys);
return Futures.inCompletionOrder(futures);
}
Note: inCompletionOrder is available since Guava 17.0; the Java driver currently uses an older version, so we need to override the dependency in our POM. The client now gets a list of futures, but they are guaranteed to be in completion order. So it can retrieve the results sequentially, with the guarantee that it won't wait unnecessarily while other results were available:
List futures = ResultSets.queryAll(session,
"SELECT * FROM users WHERE id = ?",
UUID.fromString("e6af74a8-4711-4609-a94f-2cbfab9695e5"),
UUID.fromString("281336f4-2a52-4535-847c-11a4d3682ec1") //...
);
for (ListenableFuture future : futures) {
ResultSet rs = future.get();
... // process the result set
}
(The magic behind inCompletionOrder is that the futures it returns are in fact delegates, that get resolved sequentially each time one of the original futures completes — see the source code for more details).
The next solution uses the same approach, but with a different API.
Solution 3: return an RxJava Observable
RxJava describes itself as "a library for composing asynchronous and event-based programs". One of its core abstractions is Observable, which can be seen as a concurrent iterator. An observable emits a sequence of values over time. Observers can register to an observable, to be notified as the values become available.
This translates really well to our case: once again, we start with the list of futures from our asynchronous queries; then we transform each future into an observable, and finally merge all the observables into a single one.
public static Observable queryAllAsObservable(Session session, String query, Object... partitionKeys) { List futures = sendQueries(session, query, partitionKeys); Scheduler scheduler = Schedulers.io(); List observables = Lists.transform(futures, (ResultSetFuture future) -> Observable.from(future, scheduler)); return Observable.merge(observables); } [/code] Note that we need to provide a Scheduler instance to Observable.from, otherwise each individual observator blocks on the corresponding future.
Here's the skeleton code to register an observer:
Observable results = ResultSets.queryAllAsObservable(session, "SELECT * FROM users WHERE id = ?",
UUID.fromString("e6af74a8-4711-4609-a94f-2cbfab9695e5"),UUID.fromString("281336f4-2a52-4535-847c-11a4d3682ec1") //...
);
results.subscribe(new Observer() {
@Override public void onNext(ResultSet resultSet) {
... // process the result set
}
@Override public void onError(Throwable throwable) {
... // process the error
}
@Override public void onCompleted() {
// no more results
}
});
Conclusion
The Java driver allows you to take advantage of its non-blocking nature through its executeAsync method. Guava and RxJava provide powerful combinators to transform and compose these asynchronous results. We hope this article gave you the motivation to further explore the APIs and use reactive patterns in your code. The examples in this article were kept simple for brevity, but they could be expanded in various ways:
- support for composite partition keys;
- better error handling (for example, successfulAsList sets a null element at the position of any future that failed, which is not ideal in our case);
- build an Observable<Row> rather than an Observable<ResultSet> (this is left as an exercise to the reader — hint: you'll need the flatMapIterable method).
The code samples are available in a GitHub repository.
Edit 2014/10/31: provide a Scheduler in solution 3 to make the compound observable truly asynchronous. Thanks Duy Hai Doan for spotting this.