TechnologyAugust 26, 2014

Accessing Cassandra from Spark in Java

Accessing Cassandra from Spark in Java

A few weeks ago we decided to move our Spark Cassandra Connector to the open source area (GitHub: datastax/spark-cassandra-connector). The connector is intended to be primarily used in Scala, however customers and the community have expressed a desire to use it in Java as well. To address this need we have created an additional module which provides a Java-API for the connector. In this post, we describe how to utilize this new API to access Apache Cassandra® via Apache Spark™ from Java Applications.

 

Prerequisites

At the time of writing this post, 1.0.0-rc4 version of the connector was available in the central Maven repository. This version works with Cassandra 2.0 and Spark 0.9.

Note: This blog post was written targeting DSE 4.5 which included Apache Spark™ 0.9. Please refer to the DataStax documentation for your specific version of DSE if different.

The easiest way to run this application is to follow these steps:

  1. Download and run Cassandra server
  2. Create a blank Maven project
  3. Add the following artifacts to the dependencies section:
  4. Implement the application (see the tutorial below)
  5. Compile and run the application with parameters: local[4] 127.0.0.1 (the first argument means that the application will be run without the need to use the real Spark cluster - this is the best for learning and testing purposes; the second argument is the address of a Cassandra database node)

Tutorial

Lets create a simple application which reads sales information and computes roll-up summaries for a products hierarchy.

The products hierarchy may looks like the one depicted below:

products hierarchy

We will use the following stub of the application:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

public class JavaDemo implements Serializable {

    private transient SparkConf conf;

  

    private JavaDemo(SparkConf conf) {

        this.conf = conf;

    }

  

    private void run() {

        JavaSparkContext sc = new JavaSparkContext(conf);

        generateData(sc);

        compute(sc);

        showResults(sc);

        sc.stop();

    }

  

    private void generateData(JavaSparkContext sc) {

    }

  

    private void compute(JavaSparkContext sc) {

    }

  

    private void showResults(JavaSparkContext sc) {

    }

  

    public static void main(String[] args) {

        if (args.length != 2) {

            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");

            System.exit(1);

        }

  

        SparkConf conf = new SparkConf();

        conf.setAppName("Java API demo");

        conf.setMaster(args[0]);

        conf.set("spark.cassandra.connection.host", args[1]);

  

        JavaDemo app = new JavaDemo(conf);

        app.run();

    }

}

In the subsequent sections we will implement those three empty methods which generate the data, compute summaries and display the results.

Generate data

This application creates the schema and generates random data each time it is run. The data generation procedure has been broken into three simple steps, which are presented below.

Create schema

At first we connect to Cassandra using CassandraConnector, drop the keyspace if it exists and then create all the tables from scratch. Although the CassandraConnector is implemented in Scala, we can easily use it in Java with try with resources syntax:

1

2

3

4

5

6

7

8

9

CassandraConnector connector = CassandraConnector.apply(sc.getConf());

 

try (Session session = connector.openSession()) {

    session.execute("DROP KEYSPACE IF EXISTS java_api");

    session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");

    session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");

    session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");

    session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");

}

The schema consists of three entities. We assume that the corresponding bean classes for them are already implemented (ProductSaleSummary). While creating beans to be used with Connector, remember to add appropriate getters, setters and a no-args constructor (more info here).

Generate products hierarchy

Now, we generate the products hierarchy depicted above. Of course, we can do this by sending simple insert statements, however by doing this in a Spark way, we introduce the methods used to save RDDs to Cassandra. So, firstly we create a list of Product instances, then we convert that list to an RDD and eventually save it to Cassandra:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

List<Product> products = Arrays.asList(

        new Product(0, "All products", Collections.<Integer>emptyList()),

        new Product(1, "Product A", Arrays.asList(0)),

        new Product(4, "Product A1", Arrays.asList(0, 1)),

        new Product(5, "Product A2", Arrays.asList(0, 1)),

        new Product(2, "Product B", Arrays.asList(0)),

        new Product(6, "Product B1", Arrays.asList(0, 2)),

        new Product(7, "Product B2", Arrays.asList(0, 2)),

        new Product(3, "Product C", Arrays.asList(0)),

        new Product(8, "Product C1", Arrays.asList(0, 3)),

        new Product(9, "Product C2", Arrays.asList(0, 3))

);

 

JavaRDD<Product> productsRDD = sc.parallelize(products);

javaFunctions(productsRDD, Product.class).saveToCassandra("java_api", "products");

javaFunctions is a static method defined in CassandraJavaUtil class. We strongly encourage users to import this class statically, to make the code more clear:

import static com.datastax.spark.connector.CassandraJavaUtil.*;

The methods defined in CassandraJavaUtil are the main entry points to the functionalities offered by Connector in Java. They are used to create special wrappers around such objects as: RDDDStreamSparkContextStreamingContext and their Java counterparts: JavaRDDJavaDStreamJavaSparkContextJavaStreamingContext.

Generate random sales

Finally, we want to generate random sales data. In this case we could use simple insert statements also, but we won't do that. Instead, we will use Spark to filter the products which are leaves in the hierarchy (a very naive approach - just check the number of parents), and then create 1000 random sales for each such product.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

JavaRDD<Sale> salesRDD = productsRDD.filter(new Function<Product, Boolean>() {

    @Override

    public Boolean call(Product product) throws Exception {

        return product.getParents().size() == 2;

    }

}).flatMap(new FlatMapFunction<Product, Sale>() {

    @Override

    public Iterable<Sale> call(Product product) throws Exception {

        Random random = new Random();

        List<Sale> sales = new ArrayList<>(1000);

        for (int i = 0; i < 1000; i++) {

            sales.add(new Sale(UUID.randomUUID(), product.getId(), BigDecimal.valueOf(random.nextDouble())));

        }

        return sales;

    }

});

 

javaFunctions(salesRDD, Sale.class).saveToCassandra("java_api", "sales");

In the last statement, we created a wrapper around JavaRDD. That wrapper provides a number of overloaded saveToCassandra methods. In its simplest form, it writes Java bean-like classes to Cassandra. The attributes of the bean class are mapped to the corresponding columns by their names. However, with the other versions of saveToCassandra, specific mappings are able to be defined.

Compute summaries

In this subsection we are going to compute roll-up summaries for sales of each product.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc)

        .cassandraTable("java_api", "products", Product.class)

        .keyBy(new Function<Product, Integer>() {

            @Override

            public Integer call(Product product) throws Exception {

                return product.getId();

            }

        });

 

JavaPairRDD<Integer, Sale> salesRDD = javaFunctions(sc)

        .cassandraTable("java_api", "sales", Sale.class)

        .keyBy(new Function<Sale, Integer>() {

            @Override

            public Integer call(Sale sale) throws Exception {

                return sale.getProduct();

            }

        });

First, productsRDD and salesRDD RDDs are created. They contain product id-product and product id-sale pairs respectively. In the next step we want to join these RDDs so converting them into pair RDDs was necessary.

1

JavaPairRDD<Integer, Tuple2<Sale, Product>> joinedRDD = salesRDD.join(productsRDD);

At this point, joinedRDD contains tuples in which sale and the corresponding product are together. One method to compute roll-ups (not necessarily the best one) is to multiply each sale for every product in the branch it belongs. In order to do this, we will use flatMap method:

1

2

3

4

5

6

7

8

9

10

11

12

JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMap(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() {

    @Override

    public Iterable<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception {

        Tuple2<Sale, Product> saleWithProduct = input._2();

        List<Tuple2<Integer, BigDecimal>> allSales = new ArrayList<>(saleWithProduct._2().getParents().size() + 1);

        allSales.add(new Tuple2<>(saleWithProduct._1().getProduct(), saleWithProduct._1().getPrice()));

        for (Integer parentProduct : saleWithProduct._2().getParents()) {

            allSales.add(new Tuple2<>(parentProduct, saleWithProduct._1().getPrice()));

        }

        return allSales;

    }

});

Once we have the RDD with product id and price for each sale and each hierarchy node that sale belongs to, we can simply reduce by key so that we obtain a sum of all the sales for each hierarchy node. Then, we convert the tuples to Summary objects:

1

2

3

4

5

6

7

8

9

10

11

JavaRDD<Summary> summariesRDD = allSalesRDD.reduceByKey(new Function2<BigDecimal, BigDecimal, BigDecimal>() {

    @Override

    public BigDecimal call(BigDecimal v1, BigDecimal v2) throws Exception {

        return v1.add(v2);

    }

}).map(new Function<Tuple2<Integer, BigDecimal>, Summary>() {

    @Override

    public Summary call(Tuple2<Integer, BigDecimal> input) throws Exception {

        return new Summary(input._1(), input._2());

    }

});

Finally, we use saveToCassandra method to save summaries to the database, as we did before:

1

javaFunctions(summariesRDD, Summary.class).saveToCassandra("java_api", "summaries");

Show the results

We want to display the results in the way that we have a product name and summary printed together. In order to do this, we need to join an RDD of summaries and an RDD of products by product keys:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc)

        .cassandraTable("java_api", "summaries", Summary.class)

        .keyBy(new Function<Summary, Integer>() {

            @Override

            public Integer call(Summary summary) throws Exception {

                return summary.getProduct();

            }

        });

 

JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc)

        .cassandraTable("java_api", "products", Product.class)

        .keyBy(new Function<Product, Integer>() {

            @Override

            public Integer call(Product product) throws Exception {

                return product.getId();

            }

        });

Displaying the results is pretty straightforward now:

1

2

3

4

5

List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().toArray();

 

for (Tuple2<Product, Optional<Summary>> result : results) {

    System.out.println(result);

}

Summary

This tutorial shows the two main features of Spark Cassandra Connector - reading data from Cassandra into RDD and writing RDD to Cassandra. It is intended to be a quick and simple introduction to using Cassandra in Spark applications. More information about Java API can be found in the documentation.

The full source code of the JavaDemo class and a sample pom.xml file can be found here.

One-Stop Data API for Production GenAI

Astra DB gives developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.