Cassandra User Defined Aggregates using the Python Driver
Introduction
Apache Cassandra 2.2 introduced two new useful features: User Defined Functions (UDFs) and User Defined Aggregates (UDAs). These new features allow certain types of computation to occur server-side, directly on the Cassandra cluster. In my previous post, I discussed UDFs and some scenarios that take advantage of them. In this post, I will briefly demonstrate some of the scenarios where UDAs can be used, and how these can be simplified using UDAs via the DataStax Python Driver.
Examples in this post will be using the same keyspace as earlier, shown here created via the Python driver:
1 2 3 4 5 |
|
User Defined Aggregates (UDAs)
UDAs are aggregate functions that can be run directly on Cassandra. They are composed of two parts: a UDF (called a 'state function' when in the context of UDAs) and the UDA itself, which calls the UDF for each row returned from the query. The state function contains one argument that is carried in between the calls to the different rows, and thus the aggregate works much in the same way as a fold or a reduce. This state argument must be the first argument in the UDF, and also the return value from the UDF. In the next call to the UDF via the UDA, it will automatically set the first argument be the return value from the previous call. Here's the syntax for creating a UDA:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
The SFUNC
(or State Function) is the UDF state function that will be called for each row and STYPE
(or State Type) is the return type from that function, which will also become the first argument to next call in the UDF. INITCOND
(or Initial Condition) will be the initial value passed as the state value in the first call to the UDF, and must be of type STYPE
. The FINALFUNC
(or Final Function) is an optional function that can be automatically called via the UDA once all rows have been processed by the state function. The input to the FINALFUNC
will be of STYPE, namely the output of the last call to the state function. However the output of the FINALFUNC
can be of any type and is not defined in the UDA.
Similar to UDFs, UDAs are defined in a specific keyspace and as such the state functions that it uses must come from the same keyspace. UDFs and UDAs are executed on the coordinator node in the Cassandra cluster. Thus as they will scan across all rows returned by a particular query, we must be careful that the rows that we retrieve from our query come from a specific partition key. Otherwise, the results from the query will be first brought back into the coordinator node before the functions and aggregates are executed, resulting in a performance hit.
Examples 1 and 2 below uses the following schema:
1 2 3 4 5 6 7 8 |
|
Inserted via the Python Driver:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
And the resulting data in the prices table:
1 2 3 4 5 6 7 8 9 |
|
Example #1: Using native aggregates
Prior to the 2.2 release, Cassandra already had some built-in aggregates called native aggregates
, defined in the system
keyspace. These include count
, min
, max
, sum
, and avg
. For example, we can use the avg
native aggregate to retrieve the average of the prices for a particular item:
1 2 3 4 |
|
Similar to UDFs, notice here that the resulting column name of the SELECT
query after calling the native aggregate is in the form <keyspace>_<aggregate_name>_<column_name_queried_by_aggregate>
. This is also the form for UDAs as well. We can also use an alias for the column name to make it more aesthetically pleasing:
1 2 3 4 |
|
Example #2: Minimum of a column
Also suppose for instance we would like to retrieve the minimum price for an item. We could simply use the min
native aggregate to retrieve the data. Alternatively, we could define our own min
aggregate:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
|
In this example, we have defined a state function which has a double
type as the first argument as well as the return type. The second argument current
is the value that is input from the UDA for the current row. Inside of the UDA, we have set it to aggregate over rows with a column type of double
, with the SFUNC
and STYPE
set accordingly from the values of the state function. We've also set the INITCOND
to null in the case where the aggregate is simply called on one row, such that the state function will simply return the input value (by definition the minimum of one value).
Example #3: Aggregating over a column
Consider a more complex example, given the following schema:
1 2 3 4 5 6 7 8 9 |
|
Inserted via the Python Driver:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
And the resulting data in the reviews table:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
Suppose we want to find out for a particular item, the star_rating
and categorize each star into their respective counts. Pre-Cassandra 2.2, we could have done something such as:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Here, we are pulling in all the star_ratings
for each row and performing the computation on the client-side by iterating through all the results. In Cassandra 2.2, we can move this computation to the server:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
|
In this example, we have defined an aggregate over type int
, namely the input column star_rating
. The aggregate then calls a state function which totals and stores the count for each star in its state type, a map<int,int>
. Finally, we can simply call our UDA to aggregate the star_ratings and retrieve a dictionary ready to be used in our client. One interesting thing to point out here is that the value retrieved from the 'state' argument must be explicitly casted to an Integer. This is because when the Java script is called on Cassandra, the Java runtime has no notion of what the types of the map are and thus by default it will be a Map<Object,Object>
.
Example #4: Using a finalfunc to gather more aggregate data
As a final example, suppose we would like to gather even more aggregate data from our results in the previous example. In addition to a bucketed star_ratings, perhaps we want to gather the percent that each star holds in the total count of stars. In pre-Cassandra 2.2, we would do something similar to (assuming we still have the results from the previous bucketing):
1 2 3 4 5 6 7 8 9 |
|
In Cassandra 2.2, we can neatly roll this up into a FINALFUNC
into our existing UDA:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
|
The SFUNC
, STYPE
and INITCOND
are all the same as previously. However now we have defined a FINALFUNC
percent_stars
, which takes input the state map<int,int>
from the result of the final call to the SFUNC and performs the percent calculations. Once again, note that the state map passed into the FINALFUNC
does not know about the object types at retrieval and likewise we must properly cast the values to be stored back into the map.
One more interesting thing to note here is the fact that we have used the fully qualified name for the Map
and HashMap
classes in Java. This is because only essential Java language libraries are loaded, and external libs such as the java.util library is not imported by default when this script is run. Explicitly importing a library in our script will also not work. This is because when the script is expanded into a fully runnable Java source code by Cassandra, our script simply becomes a method that is called within the a larger framework. The generated code for our FINALFUNC
is below, in the case where we attempted to import java.util.*
, (note the incorrect placement of the import statement in the inner method):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
|
Conclusion
User Defined Functions and User Defined Aggregates are powerful new features introduced in Cassandra 2.2. They help to both conserve network bandwidth and cut down on client-side computations by performing functions and aggregations directly at the data source. Grab the latest Cassandra and try out UDFs and UDAs to simplify your client-side code!