CompanySeptember 29, 2014

Interactive Advanced Analytics with DSE and Spark MLlib

Interactive Advanced Analytics with DSE and Spark MLlib

Update May 21, 2015: DataStax Enterprise (DSE) version 4.7 was just released this week and includes official support for the Apache Spark™ MLlib integration. In prior DSE releases (4.5 and 4.6), it was an experimental feature (available for those who want to learn and experiment with it, but not yet recommended for use in production).

DataStax Enterprise (DSE) 4.5 now has the capability to perform in-memory analytics utilizing integrated Apache Spark™. Spark has proven performance on batch and interactive analytics. Spark also supports streaming from external sources making it a powerful real-time analytics platform. Starting a Spark cluster is as simple as editing one line in the DSE config file or by starting DSE with the `dse cassandra -k` command. See the full installation and development documentation here.

Apache Spark™ is written in Scala but has support for other languages like Python and Java. For those apprehensive about Scala, here is a good article about accessing Apache Cassandra® using the Spark Cassandra Connector Java API. Data scientists who are more familiar with languages such as R, MATLAB, SAS or Octave will likely be more comfortable with Scala than Java. Data analysis tools like R and MATLAB provide an interactive shell to work with data and are usually bundled with a comprehensive set of machine learning algorithms and libraries, but these tools usually have scalability and performance bottlenecks. For users looking for the best possible performance, a Spark cluster’s performance cannot be beaten. The Spark interactive shell (based on Scala shell) looks similar to the R shell and and makes starting with Spark easy. There are a number of Scala tutorials available to quickly get one up to speed with the Scala syntax.

Spark contains a number of libraries for data streaminggraphs and machine learning

In this article we’ll focus on Spark MLlib, a module for machine learning which contains the following algorithms:

MLlib is being rapidly developed, so many new algorithms are being added. I’d like to show how to do advanced analytics with Spark and Cassandra by solving some classical machine learning task. I will build a classifier for the Iris flower data set using the Naive Bayes algorithm.

Prepare and Save the Data Set to Apache Cassandra

The Iris flower data set is the most commonly used data set in machine learning tutorials. It consists of 50 samples from each of three species of Iris and 4 features measured from each sample. This is not a “Big Data” example, but it provides the fundamental techniques that can be used at scale. You can generate more data if needed. In this example we will show how to store data in Apache Cassandra®, load data back into Spark, and train our model. This process will build a Naive Bayes classifier which will name a flower based on the 4 feature measurements.

Normally you already have data in Cassandra to analyze so this part of the article is optional. You can put data directly into Cassandra through cqlsh or using any Cassandra driver. I will use the Spark connector features to do it from the Spark shell to be consistent.

First put the data set to the shared file system accessible from all cluster nodes. CFS is a natural choice and it's available out of the box in DSE. I also skip the CSV header for parsing convenience.

1

2

wget http://www.heatonresearch.com/dload/data/iris.csv

tail -n +2 iris.csv |dse hadoop fs -put - iris.csv

Start the interactive Spark shell. Everything else will be done in the shell.

1

dse spark

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 0.9.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
Creating SparkContext...
Created spark context..
Spark context available as sc.
Type in expressions to have them evaluated.
Type :help for more information.
scala>

DSE Spark creates a preconfigured SparkContext in the variable sc that allows Spark to connect to Cassandra through the Spark Cassandra Connector. The context object provides methods to select and save data to Cassandra from Spark RDD. Resilient Distributed Dataset (RDD) is a primary Spark abstraction to store distributed data set. It supports a wide set of filter/head/tail/map/reduce/cogroup… operations. All operations are parallel and lazy, executed at the moment you call a Spark output action such as collect() or saveAs..() to get final result.

Before we continue It is very useful to define a case class that will wrap the data. The class will be transparently mapped from/to Cassandra rows by connector methods.

1

2

3

4

5

6

7

8

case class Iris(

    id:java.util.UUID,

    sepal_l:Double,

    sepal_w:Double,

    petal_l:Double,

    petal_w:Double,

    species:String

)

The “id” field is not in the original data set, but a unique key is needed to store data in Cassandra.

Load data from the file

1

val data = sc.textFile("iris.csv")

Parse data and generate random id for Iris objects

1

2

3

4

5

val parsed = data.filter(!_.isEmpty).map {row =>

    val splitted = row.split(",")

    val Array(sl, sw, pl, pw) = splitted.slice(0,4).map(_.toDouble)

    Iris (java.util.UUID.randomUUID(), sl, sw, pl, pw, splitted(4))

}

Let’s print a couple of rows to verify our collection

1

parsed.take(2).foreach(println)

  • 1
  • 2
Iris(3799e309-a6dc-4e0c-b319-7bfcb93040c2,5.1,3.5,1.4,0.2,Iris-setosa)
Iris(e14cbb0b-14e7-40bd-a950-a9265594f1f5,4.9,3.0,1.4,0.2,Iris-setosa)

Data is ready to be processed by Spark or saved to Cassandra. I will store it into Cassandra and then load back.

The Spark Cassandra Connector allows us to execute custom CQL queries. So it can be used to create a Cassandra keyspace and table.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

import com.datastax.spark.connector.cql.CassandraConnector

CassandraConnector(sc.getConf).withSessionDo { session =>

    session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")

    session.execute ("""CREATE TABLE IF NOT EXISTS

        test.iris (

            id uuid primary key,

            sepal_l double,

            sepal_w double,

            petal_l double,

            petal_w double,

            species text

        )

    """)

}

Finally save the data to Cassandra

1

parsed.saveToCassandra ("test", "iris")

Load Data from Apache Cassandra

Loading data is very simple

1

val data = sc.cassandraTable[Iris]("test", "iris").cache()

The cache() function will cause the table to be cached in memory and speed up future operations.

Prepare Data for MLlib

The MLlib works with LabeledPoint objects that consists of label (double value) to mark a class and array of double features. So we need to define mapping from flower name to index and back. The code will select all ‘species’, get distinct values, index them and create map. Then create reverse map.

1

2

val class2id = data.map(_.species).distinct.collect.zipWithIndex.map{case (k,v)=>(k, v.toDouble)}.toMap

val id2class = class2id.map(_.swap)

Map Iris data to LabeledPoint

1

2

import org.apache.spark.mllib.regression.LabeledPoint

val parsedData = data.map { i => LabeledPoint(class2id(i.species), Array(i.petal_l,i.petal_w,i.sepal_l,i.sepal_w)) }

Work with MLlib predictors

Train NaiveBayes classifier.

1

2

import org.apache.spark.mllib.classification.NaiveBayes

val model = NaiveBayes.train(parsedData)

We are done with learning and now we can recognize irises by passing 4 measures.

1

model.predict(Array(5, 1.5, 6.4, 3.2))

res6: Double = 2.0

Or more readable:

1

id2class(model.predict(Array(5, 1.5, 6.4, 3.2)))

res7: String = Iris-versicolor

To be continued...

I’d like to stop at this point. A data scientist will ask a lot of question here:

  • How can we Split data into training and test sets?
  • Save a model and deploy it to production?
  • Measure quality of the model?
  • Tune training algorithms?
  • Chart data?

And other good questions. Stay tuned for Part II.

One-Stop Data API for Production GenAI

Astra DB gives developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.