User Defined Aggregations with Spark in DSE 5.0
There are already a couple of blog posts and presentations about UDF/UDA. But for those who do not know how to use Apache Cassandra™ User Defined Functions (UDF) and User Defined Aggregates (/UDA), here is a short introduction on how to use them from Apache Spark™ to push down partition-level aggregations.
Introduction to User Defined Functions (UDF)
Apache Cassandra™ 2.2 introduced User-Defined-Functions and User-Defined-Aggregates, which allowed users to write their own scalar functions and use these to build their own aggregations. UDF and UDA are executed on the coordinator, which is the node that is executing the query thrown into the cluster. UDF, UDA and built-in aggregations (e.g. count, min, max, avg, built-in functions) are applied on the result with respect to the actual consistency level. In other words, Cassandra first performs the read operations as it would without the functions and then applies the functions/aggregation on the result set.
UDF are pure scalar functions. Pure functions depend on the input arguments and produce a single result. One or more columns can be passed into a UDF or UDA. UDF should really be written in Java.
In order to sum up the column values, a UDF could look like this:
CREATE FUNCTION sum_two(a int, b int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE java AS 'return a + b;';
It could then be used like this: SELECT key, sum_two(column_a, column_b) AS the_sum FROM some_table WHERE key = 123;
Aggregations work on multiple rows and produce a single row. In order to do that, an aggregation needs an initial state. For each row that has been read, the aggregations state function is called taking both the current state and the value(s) from the current row returning the new state. After all rows are processed, an optional final function converts the last state into the aggregations result - otherwise the last state is the aggregations result.
For example:
CREATE AGGREGATE sum_it(int) SFUNC sum_two STYPE int INITCOND 0;
A more thorough description of UDF and UDA is available here.
User Defined Functions (UDF) and User Defined Aggregates (UDA) Background
UDF Compilation
Each Java UDF will be compiled to a Java class - so it’s bytecode for the Java Virtual Machine (JVM). In order to compile the UDF, we need a Java source file. Basically, we only put the UDF code into a Java source template and add some necessary (de)serialization code. Package and class names are random. Take a look at the source of that template here.
Scripted UDF are just compiled as specified in the code part of the CREATE FUNCTION statement.
The Sandbox
For Apache Cassandra™ 2.2 we put the label experimental on both UDF and UDA and you had to explicitly enable UDF in cassandra.yaml. In fact, you can do anything you want with UDF in C* 2.2 - there's nothing that protects the node from an evil UDF despite permissions with authentication/authorization.
For Apache Cassandra™ 3.0, so DataStax Enterprise 5.0, we added a sandbox for UDF. That sandbox ensures that UDF do not do evil things. But you still have to explicitly enable UDF in cassandra.yaml.
So, what does evil mean? In short, an evil UDF is one that is not pure as described in CASSANDRA-7395. Pure UDF get input parameter values, operate on these values and return a result. UDF depend only on the input parameters. There are no side effects and no exposure to C* internals.
Let’s recap what the sandbox is for and how it protects the system:
- no I/O (disk, files, network, punch card)
- no thread management
- don't exit or halt or freeze the JVM
- no access to internal classes (like
org.apache.cassandra.*
,com.datastax.*
, etc.) - no access to 3rd party libraries (like
com.google.*
, sigar, JNA, etc.)/li> - but allow access to required Java Driver classes (not all driver classes)
- no use of locks and synchronized keyword
- deny creation of UDF that somehow try to inject other code like a static block in Java UDF
- detect and safely stop UDF running too long
- detect and safely stop UDF that consume too much heap
The straight-forward approach could be to use the Java Security Manager and that’s it, right? ... Sorry, that is definitely not enough. In fact, the Java Security Manager is just a tiny part of the sandbox. For example, it does not protect you from creating a new thread, if the current thread is not in the root thread group. Additionally, it is not able to apply a runtime or heap quota.
The most important parts of the sandbox are the Java bytecode inspection (for Java UDF) and restricted class loading (for all UDF) introduced in DSE 5 (Apache Cassandra™ 3.0).
The bytecode inspection will prevent creation of a UDF that tries to detect evil things like injection of a static code block that would be executed when the UDF is created. It also detects usages of the synchronized keywords among other things.
Such a UDF will not pass validation:
CREATE FUNCTION udf_func(input int) CALLED ON NULL INPUT RETURNS int LANGUAGE java AS $$ return input; } static { System.exit(0); $$; InvalidRequest: code=2200 [Invalid query] message="Could not compile function 'udfdemo.udf_func' from Java source: org.apache.cassandra.exceptions.InvalidRequestException: Java UDF validation failed: [static initializer declared]"
Detecting UDF executions that run too long or consume too much heap is not rocket science - it is about getting the consumed CPU time and heap values from the JVM. The problem is not to detect such a situation - the problem is how to handle the situation. Naively one could just kill the thread, right? But the answer to that is - No. Killing a thread is almost never the solution. In fact, it destabilizes the whole process in every programming language.
All currently released versions of Apache Cassandra™ can detect runtime and heap quota violations and fail fast. Fail fast in this case means that a non-recoverable situation has been detected by Cassandra and the only "way out" it to stop the node. Each UDF execution is performed in a separate thread pool.
Our current proposal is to manipulate the compiled byte code, inject some guardian code that checks the runtime and heap quotas and aborts the UDF. Additionally, this allows us to execute Java UDF directly without the need for a separate thread pool. Unfortunately, this only works for Java UDF and not for JavaScript or any other JSR 223 provider. Evil JavaScript code can only be detected - and that still means fail fast and stop the node.
Since Java and scripted UDF fundamentally differ from the sandbox's point of view, there is a second option in cassandra.yaml
that needs to be enabled when you want to enable scripted UDF in addition to Java UDF.
Why all this effort?
The answer is two-fold. First, imagine that one accidentally coded an endless loop (e.g. a loop with the wrong break condition) or produced an infinitely growing list or string. In this case, the goal is to not let the DSE/Cassandra cluster fail on coding bugs (you know, there is nothing like bug-free code).
Second and probably most important goal is to prevent intentional attacks via UDF. Since there is nothing like a perfect system, UDF still needs to be explicitly enabled in cassandra.yaml
. Last but not least, please be aware that detecting or even preventing bad things in scripted UDF is really difficult and nearly impossible. Therefore, it’s important that you enable the cassandra.yaml
to use scripted UDFs.
Aggregation is not analytics
People sometimes get confused about aggregation in a distributed database. Somehow they assume that aggregation in a distributed database is the essential building block for distributed analytics. But analytics is so much more.
Distributed analytics in Apache Spark™ uses resilient distributed datasets (RDD), which are immutable. RDD come along with processing instructions. Spark can then build a directed, cyclic graph and distribute the work if and where appropriate.
Analytics itself supports map-reduce, grouping, aggregation, re-partitioning and a whole zoo of more operations on the data as well as support for distributed state. Nowadays analytics has to support many programming languages like Scala, Java, Python, R to solve a specific problem in the most efficient way. Additionally, it has to support different kinds of sources and targets like Apache Cassandra™, Kafka, relational databases, flat files in various formats like parquet, CSV, tabular.
Another false assumption that I sometimes hear, is that UDF and UDA adds magic and is able to work on a vast amount of partitions. Although that would be really cool, it is just not true.
And frankly, why should we build another analytics engine inside Cassandra. Again, UDF and UDA are building blocks for something bigger. Database and analytics are related - but not the same thing.
Code walkthrough
The following lines are meant to provide a simple example on how to benefit from UDF and UDA. This example assumes a simple sensor network measuring weather data like temperature. The minimum requirements for the code walkthrough is a DSE 5.0 single node "cluster" with analytics and UDF enabled. In cassandra.yaml
, set enable_user_defined_functions: true
before you issue dse cassandra -k
to start DSE with analytics enabled.
The first thing needed is a table and just enough data for this example.
CREATE KEYSPACE udfdemo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; USE udfdemo; CREATE TABLE measurements ( sensor text, time_bucket text, ts timestamp, temp float, PRIMARY KEY ( (sensor, time_bucket), ts)); INSERT INTO measurements (sensor, time_bucket, ts, temp) VALUES ('11-111', '2016-04-26', '2016-04-26 12:00:00', 7.8); INSERT INTO measurements (sensor, time_bucket, ts, temp) VALUES ('11-111', '2016-04-26', '2016-04-26 13:00:00', 8.3); INSERT INTO measurements (sensor, time_bucket, ts, temp) VALUES ('11-111', '2016-04-26', '2016-04-26 14:00:00', 9.2); INSERT INTO measurements (sensor, time_bucket, ts, temp) VALUES ('11-111', '2016-04-27', '2016-04-27 12:00:00', 10.2); INSERT INTO measurements (sensor, time_bucket, ts, temp) VALUES ('11-111', '2016-04-27', '2016-04-27 13:00:00', 9.3); INSERT INTO measurements (sensor, time_bucket, ts, temp) VALUES ('11-111', '2016-04-27', '2016-04-27 14:00:00', 9.9); INSERT INTO measurements (sensor, time_bucket, ts, temp) VALUES ('11-222', '2016-04-26', '2016-04-26 12:00:00', 21.9); INSERT INTO measurements (sensor, time_bucket, ts, temp) VALUES ('11-222', '2016-04-26', '2016-04-26 13:00:00', 20.2); INSERT INTO measurements (sensor, time_bucket, ts, temp) VALUES ('11-222', '2016-04-26', '2016-04-26 14:00:00', 21.5); INSERT INTO measurements (sensor, time_bucket, ts, temp) VALUES ('11-222', '2016-04-27', '2016-04-27 12:00:00', 22.4); INSERT INTO measurements (sensor, time_bucket, ts, temp) VALUES ('11-222', '2016-04-27', '2016-04-27 13:00:00', 24.0); INSERT INTO measurements (sensor, time_bucket, ts, temp) VALUES ('11-222', '2016-04-27', '2016-04-27 14:00:00', 23.7); SELECT * FROM measurements; sensor | time_bucket | ts | temp --------+-------------+--------------------------+------ 11-111 | 2016-04-27 | 2016-04-27 10:00:00+0000 | 10.2 11-111 | 2016-04-27 | 2016-04-27 11:00:00+0000 | 9.3 11-111 | 2016-04-27 | 2016-04-27 12:00:00+0000 | 9.9 11-111 | 2016-04-26 | 2016-04-26 10:00:00+0000 | 7.8 11-111 | 2016-04-26 | 2016-04-26 11:00:00+0000 | 8.3 11-111 | 2016-04-26 | 2016-04-26 12:00:00+0000 | 9.2 11-222 | 2016-04-27 | 2016-04-27 10:00:00+0000 | 22.4 11-222 | 2016-04-27 | 2016-04-27 11:00:00+0000 | 24 11-222 | 2016-04-27 | 2016-04-27 12:00:00+0000 | 23.7 11-222 | 2016-04-26 | 2016-04-26 10:00:00+0000 | 21.9 11-222 | 2016-04-26 | 2016-04-26 11:00:00+0000 | 20.2 11-222 | 2016-04-26 | 2016-04-26 12:00:00+0000 | 21.5
Globally, some people use Celsius as a measurement for temperature, others prefer Fahrenheit. One nice thing we could need is a function that converts Celsius to Fahrenheit, because the sensors use Celsius.
CREATE FUNCTION celsius_to_fahrenheit(temp float) RETURNS NULL ON NULL INPUT RETURNS float LANGUAGE java AS 'return temp * 1.8f + 32;';
To return the temperature in Fahrenheit, we need to apply the celsius_to_fahrenheit UDF to the temp column:
SELECT sensor, time_bucket, ts, temp as temp_celsius, celsius_to_fahrenheit(temp) as temp_fahrenheit FROM measurements; sensor | time_bucket | ts | temp_celsius | temp_fahrenheit --------+-------------+--------------------------+--------------+----------------- 11-111 | 2016-04-27 | 2016-04-27 10:00:00+0000 | 10.2 | 50.36 11-111 | 2016-04-27 | 2016-04-27 11:00:00+0000 | 9.3 | 48.74 11-111 | 2016-04-27 | 2016-04-27 12:00:00+0000 | 9.9 | 49.82 11-111 | 2016-04-26 | 2016-04-26 10:00:00+0000 | 7.8 | 46.04 11-111 | 2016-04-26 | 2016-04-26 11:00:00+0000 | 8.3 | 46.94 11-111 | 2016-04-26 | 2016-04-26 12:00:00+0000 | 9.2 | 48.56 11-222 | 2016-04-27 | 2016-04-27 10:00:00+0000 | 22.4 | 72.32 11-222 | 2016-04-27 | 2016-04-27 11:00:00+0000 | 24 | 75.2 11-222 | 2016-04-27 | 2016-04-27 12:00:00+0000 | 23.7 | 74.66 11-222 | 2016-04-26 | 2016-04-26 10:00:00+0000 | 21.9 | 71.42 11-222 | 2016-04-26 | 2016-04-26 11:00:00+0000 | 20.2 | 68.36 11-222 | 2016-04-26 | 2016-04-26 12:00:00+0000 | 21.5 | 70.7
In order to get the average temperature per day, we would need a UDA for our average. (Side node: Apache Cassandra™ 3.0/DSE 5.0 already has built-in aggregations like min
, max
and avg
.)
An aggregate needs at least a state function, that combines the current (or initial) state with values from a row. Additionally it may require a final function that produces the result to be returned from the last state value. To calculate an average we need a state that sums up all values and counts the number of values. In this example we just use a tuple with an int
and a double
for the state.
First, we need the UDF that maintains the current state of the aggregation. It is called avg_state
in the following code snippet and takes the state as the first argument and the column value from the current row as the second argument. The next line with RETURNS NULL ON NULL INPUT
defines that the method must not be called if any of its arguments is null. The opposite would be CALLED ON NULL INPUT
and you would have to deal with null values. But in our case, considering null values does not make sense. The 3rd line RETURNS tuple<int,double>
defines the return type, which must be the same as the type of the first argument, because a UDA state function takes the state as the first argument and returns the new state. LANGUAGE java
defines that the UDF code is Java code. The following lines define the UDF code, which increments the value counter (the first field of the state tuple) and adds the value to the second field of the state tuple.
CREATE OR REPLACE FUNCTION avg_state(state tuple, val float) RETURNS NULL ON NULL INPUT RETURNS tuple LANGUAGE java AS $$ state.setInt(0, state.getInt(0) + 1); state.setDouble(1, state.getDouble(1) + val); return state; $$;
Since we want the average as a floating point as the result of the aggregation, we have to convert the last state using a final function, which is called avg_final
in this example. A final function just takes one argument: the last state. For this example, it is sufficient to divide the sum of all values by the number of values.
CREATE OR REPLACE FUNCTION avg_final(state tuple) RETURNS NULL ON NULL INPUT RETURNS float LANGUAGE java AS 'return (state.getInt(0) == 0) ? 0f : (float)(state.getDouble(1) / state.getInt(0));';
The user-defined-aggregation itself takes a couple of thing. First, it declares the argument type. Argument names are not necessary. The state type, in our case the tuple of int
and double
, is defines in line 3. The state type is the first argument for the state function and the remaining state function argument types are those of the aggregate itself. In this example the state function arguments are: tuple
and float
. These are the argument types of the declared state function avg_state
. The final function needed to convert the last state into a "simple" float is declared in line 4 and is avg_final
. Line 5 defines the initial value for the state, which is 0
for both the count and the sum of values for the tuple.
CREATE AGGREGATE temp_avg(float) SFUNC avg_state STYPE tuple FINALFUNC avg_final INITCOND (0, 0);
To retrieve the average temperature for the first sensor on 2016-04-27, we need the following CQL:
SELECT sensor, time_bucket, temp_avg(temp) FROM measurements WHERE sensor='11-111' AND time_bucket='2016-04-27'; sensor | time_bucket | udfdemo.temp_avg(temp) --------+-------------+------------------------ 11-111 | 2016-04-27 | 9.8
Oh, you want it in Fahrenheit? Use our celsius_to_fahrenheit function!
SELECT sensor, time_bucket, celsius_to_fahrenheit(temp_avg(temp)) FROM measurements WHERE sensor='11-111' AND time_bucket='2016-04-27'; sensor | time_bucket | udfdemo.celsius_to_fahrenheit(udfdemo.temp_avg(temp)) --------+-------------+------------------------------------------------------- 11-111 | 2016-04-27 | 49.64
The nice thing about UDA, and other built-in aggregations, is that only the aggregated result is pushed through the wire to the client. This may become important, when there is more data to aggregate.
Spark and UDA Walkthrough
Combining the demo above with Apache Spark™ is straight-forward. The requirements are DSE 5.0 or newer with enable_user_defined_functions: true
set in cassandra.yaml
and DSE Analytics enabled using dse cassandra -k
.
In order to run Spark, start the Spark Shell using the dse spark
command. It automatically creates a Spark context and connects to Apache Cassandra™ and Apache Spark™ in DSE Analytics. To verify that the connection is setup correctly and the schema is available, execute :showSchema udfdemo
. It should print the following output.
scala> :showSchema udfdemo ======================================== Keyspace: udfdemo ======================================== Table: measurements ---------------------------------------- - sensor : String (partition key column) - time_bucket : String (partition key column) - ts : java.util.Date (clustering column) - temp : Float
The next example assumes that we get (or have) a list of sensor and day tuples for which the average temperature shall be computed. Since sensor and day are the partition key of the measurements
table, pushing down the aggregation (computation of the average temperature per day per sensor) into Apache Cassandra™ is absolutely fine. It is a very cheap computation and saves some bandwidth on the wire between Apache Cassandra™ and Apache Spark™.
Note: I've omitted the scala>
prefix that appears in the Spark Shell to make it easier to copy & paste the code.
Line #1 just sets up the sequence with the sensor and day tuples
Line #2 creates an RDD from the sequence so Spark can parallelize the work on the sequence
Line #3 joins the sensor and day tuples with the CQL partitions in the measurements
table
SomeColumns
defines the columns to be returned, which includes the call to our User Defined Aggregation temp_avg
operating on the temp
column. Note that the feature to call User Defined Aggregations and User Defined Functions has been introduced in spark-cassandra-connector using FunctionCallRef
in version 1.5.0-M3 of the connector. DSE 5.0 comes with version 1.6.
val sensorsAndDays = Seq( ("11-111", "2016-04-26"), ("11-111", "2016-04-27"), ("11-222", "2016-04-26"), ("11-222", "2016-04-27") ) val sensorsAndDaysRDD = sc.parallelize(sensorsAndDays.map( (_, "") )) val result = sensorsAndDaysRDD. map( _._1 ). joinWithCassandraTable("udfdemo","measurements", SomeColumns("sensor", "time_bucket", FunctionCallRef("temp_avg", Seq(Right("temp")), Some("avg_temp")))) result.collect.foreach(println) ((11-111,2016-04-26),CassandraRow{sensor: 11-111, time_bucket: 2016-04-26, avg_temp: 8.433333}) ((11-111,2016-04-27),CassandraRow{sensor: 11-111, time_bucket: 2016-04-27, avg_temp: 9.8}) ((11-222,2016-04-26),CassandraRow{sensor: 11-222, time_bucket: 2016-04-26, avg_temp: 21.2}) ((11-222,2016-04-27),CassandraRow{sensor: 11-222, time_bucket: 2016-04-27, avg_temp: 23.366667})
Wrap-up
UDF and UDA are nice building blocks for new features and also nice to be used from analytics code like Spark. But please do not forget that the execution of UDA will always take place on the coordinator node. This is not because we are lazy but because we still have to respect the consistency level provided with the query. Also, only the coordinator can provide consistency level behaviors and guarantees. Under the covers we still perform normal reads but apply the UDF or UDA on top of these reads.
The UDF sandbox is pretty solid in my opinion and will become even better. Although UDF is disabled by default in cassandra.yaml
, enabling Java UDF and having the sandbox should provide enough security.
Some things to keep in mind and really respect:
- If you intend to use JavaScript UDF, use Java UDF - really.
- Check your UDF thoroughly. Long running UDF as well as UDF consuming a lot of heap will cause the node to "fail fast" - i.e. stop the node, and probably not just one node...
- UDA are absolutely fine if applied to a single partition. This is nothing new - single-partition-reads is what you should do with Apache Cassandra™ in general.
- UDA is a nice building block for other analytics engines like Spark. So you should perform single-partition aggregations in Cassandra and compute the grand-aggregate in Spark.
- Do not misuse UDF to perform really expensive computations.