TechnologyJanuary 20, 2015

Kindling: An Introduction to Spark with Cassandra (Part 1)

Kindling: An Introduction to Spark with Cassandra (Part 1)

Erich is the CTO for SimpleRelevance a company which does dynamic content personalization using all the tools of data science.  Before joining SimpleRelevance, Erich spent many years working on scalable distributed architectures.  In college, he focused on mathematics and computer graphics.  To this day, he still loves studying and talking about mathematics and is currently having a lot of fu with category theory and the functional languages F# and Clojure.


This is an introduction to the new (relatively) distributed compute platform Apache Spark.  The focus will be on how to get up and running with Spark and Cassandra; with a small example of what can be done with Spark.  I chose to make this the focus for one reason:  when I was trying to learn Spark two months ago I had difficulty finding articles on how to setup Spark to use Cassandra.  The process is actually not that difficult, but pulling all the steps together required some searching and investigation.  The secondary focus is to give you a small taste of what can be done with Spark, so that once you have the Spark cluster up and running there is motivation to keep exploring and experimenting.

This is based on a talk I gave at the Chicago Cassandra meet up in late October.  That talk in turn was done to give me a fixed goal for learning how to use Spark.  This blog post is the reverse of that, it took quite a bit of work in my free time to get Spark working with Cassandra, so here I hope to lay out the minimum steps needed to get a Spark and Cassandra cluster up and running.

What Is Spark?

Spark is a recent Hadoop successor and it looks to have taken a lot of lessons from how the Hadoop API was designed.  Spark works with a master/slave configuration, where there is a light weight “Master” service which acts as an interface to the cluster; it keeps track of the state of each node and wrangles nodes when jobs are submitted.  Spark differs from Hadoop in several ways:  it supports both batch and stream processing, multiple programming languages out of the box (Scala, Java, and Python), in memory computations, an interactive shell, and a significantly easier to use API.  The last three are, in my opinion, the most exciting aspects of Spark.

With in memory computations, you can tell Spark to cache data into RAM as it is pulled out of data stores (HDFS, SQL, Cassandra, etc.).  After that, any computations performed on that data are done in memory without any expensive queries; this makes analytics much much faster than Hadoop.  If you go to http://spark.apache.org, that little graph on the landing page which shows the performance advantage over Hadoop is why Spark has in memory computations.

The interactive shell is used to connect to a Spark cluster and do interactive computations on your data.  This makes experimenting, poking around, and general hacking much easier.  Having a Spark Cluster also allows a developer or data scientist to quickly test out code and verify that it works without having to go through a slow deployment process.

Spark’s API design is what truly differentiates it from Hadoop.  The focus is on making interacting with distributed data incredibly elegant with as little boilerplate and configuration code as possible.  This design is founded upon the Resilient Distributed Dataset (referred to as an RDD) which abstracts out the data (that is spread across many machine) into a single enumerable data structure.  You program against this data structure exactly as if it were the familiar Lists, Vectors, or Arrays that are built into modern languages.  The RDD makes reasoning about distributed programs straightforward and natural.

The RDD truly shines in how you use it in code.  A very functional language paradigm is used for transforming and acting upon data:  which will be familiar to anyone who has used .Net’s LINQ framework or Java 8’s Streams.  To work with an RDD, you simply chain together a series of transformations (such as filters or maps or sorts) and then use an action to do something with that data (such as grab the first 10 elements).  There are two types of functions: transformations and actions.

Transformations perform change the state of the data in some way; they may filter the data, or double the value of an integer, or sort they data, or group the data by a key.  It’s important to understand that transformations are lazily evaluated:  this means that no work is done until an Action function is called which asks Spark to produce a result.  Lazy evaluation also means that Spark will do the bare minimum amount of work to produce the desired result.

Actions are functions which actually pull a value or values from the RDD; this could be asking for the first element, or the first ten elements, or the number of elements in the RDD, or iterating through each element of the RDD.  Many things can be accomplished in Spark simply by chaining a series of transformations together and then iterating with a foreach.  As mentioned above, the Action is what causes Spark to go out and perform a computation.

How to Install Spark with Cassandra

The following steps describe how to setup a server with both a Spark node and a Cassandra node.

Note: For the purposes of this article, Spark and Cassandra will both be running on localhost.

There are two paths for setting up a Spark & Cassandra server:  if you have DataStax Enterprise then you can simply install an Analytics Node and check off the box for Spark or, if you are using the open source version, then you will need to follow these steps.  The second path is a little tricky, and getting the Spark Shell to work with Cassandra required the assistance of the very kind Cassandra community at StackOverflow.

Setting Up Open Source Spark

This assumes you already have Cassandra setup.

  1. Download and setup Spark
    1. Go to http://spark.apache.org/downloads.html.
    2. To make things simple, we will use one of the prebuilt Spark packages.
    3. Choose Spark version 1.2.0 and “Pre-built for Hadoop 2.4” then Direct Download.  This will download an archive with the built binaries for Spark.
    4. Extract this to a directory of your choosing.  I will put mine in ~/apps/spark-1.2
    5. Test Spark is working by opening the Shell
  2. Test that Spark Works
    1. cd into the Spark directory
    2. Run “./bin/spark-shell”.  This will open up the Spark interactive shell program
    3. If everything worked it should display this prompt: “scala>”
    4. Run a simple calculation:
      sc.parallelize( 1 to 50 ).sum(_+_)
      which should output 1250.
    5. Congratulations Spark is working!
    6. Exit the Spark shell with the command “exit”

The Spark Cassandra Connector

To connect Spark to a Cassandra cluster, the Cassandra Connector will need to be added to the Spark project.  DataStax provides their own Cassandra Connector on GitHub and we will use that.

  1. Clone the Spark Cassandra Connector repository: https://github.com/datastax/spark-cassandra-connector
  2. cd into “spark-cassandra-connector”
  3. Build the Spark Cassandra Connector
    1. Execute the command “./sbt/sbt assembly”
    2. This should output compiled jar files to the directory named “target”.  There will be two jar files, one for Scala and one for Java.
    3. The jar we are interested in is: “spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar” the one for Scala.
  4. Move the jar file into an easy to find directory:  I put mine into ~/apps/spark-1.2/jars

To load the connector into the Spark Shell:

  1. start the shell with this command:  ../bin/spark-shell –jars ~/apps/spark-1.2/jars/spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar
  2. Connect the Spark Context to the Cassandra cluster:
    1. Stop the default context:
      sc.stop
    2. Import the necessary jar files:
      import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
    3. Make a new SparkConf with the Cassandra connection details:
      val conf = new SparkConf(true).set(“spark.cassandra.connection.host”, “localhost”)
    4. Create a new Spark Context:
      val sc = new SparkContext(conf)
  3. You now have a new SparkContext which is connected to your Cassandra cluster.

Test that the Spark Connector is working from the Shell

  1. Create a keyspace called “test_spark” in Cassandra
  2. create the table test_spark.test (value int PRIMARY KEY); in the test_spark keycap
  3. Insert some data (INSERT INTO test_spark (value) VALUES (1);)
  4. From the Spark Shell run the following commands:
    1. val test_spark_rdd = sc.cassandraTable(“test_spark”, “test”)
    2. test_spark_rdd.first
    3. The output should be:
      res1: com.datastax.spark.connector.CassandraRow = CassandraRow{value: 1}

Opening the Spark Shell

(A large number of thanks to the people who helped me:http://stackoverflow.com/questions/25837436/how-to-load-spark-cassandra-connector-in-the-shell )

Interacting with Cassandra from Spark

To read data from Cassandra, you create an RDD from a specific table.  This is very simple:

1

val data = sc.cassandraTable(“my_keyspace”, “my_table”)

This will return an RDD of type Row, where Row is a data type which stores a single row from a table as a map from column name to value.

To make interacting with the data from Cassandra more elegant, it is possible to map the rows directly to a custom type which represents an atomic piece of data from the table.  One way to do this is to create a case class which represents the data from table.  For example, if there was a table of movies which consisted of these columns: (id int, title text, genres text).  Then a respective case class would be:

1

case class Movie(Id: Int, Title: String, Genres: String)

Then to read map the Cassandra table directly to that type you simply call ‘cassandraTable’ and specify the type:

1

val data = sc.cassandraTable[Movie](“spark_demo”, “movies”)

Writing data to Cassandra is just as simple: on an RDD call the function saveToCassandra and specify the keycap and the table.  Make sure that the type of the RDD maps to the Cassandra table.

1

data.saveToCassandra(“spark_demo”, “movies”)

Will save the RDD of type Movie to the movies table in the keyspace spark_demo.

We have gotten Spark setup and running with Cassandra, and shown the basics of interacting with Cassandra from Spark and this marks the end of the first part of my article.  This may seem like an abrupt end, but do not worry, the focus of this post was explaining how to get Spark up and running with Cassandra.  In two weeks, I will be writing another post where we take our Spark+Cassandra stack and start doing cool things with it.


This blog post originally appeared in Planet Cassandra.

One-Stop Data API for Production GenAI

Astra DB gives developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.