TechnologyJuly 28, 2021

Analyzing Cassandra Data using GPUs, Part 1

Analyzing Cassandra Data using GPUs, Part 1

Organizations keep much of their high-speed transactional data in fast NoSQL data stores like Apache Cassandra®. Eventually, requirements emerge to obtain analytical insights from this data. Historically, users have leveraged external, massively parallel processing analytics systems like Apache Spark for this purpose. However, today's analytics ecosystem is quickly embracing AI and ML techniques whose computation relies heavily on GPUs. In this post, we explore a cutting-edge approach for processing Cassandra SSTables by parsing them directly into GPU device memory using tools from the RAPIDS ecosystem. This will allow users to reach insights faster with less initial setup and also make it easy to migrate existing analytics code written in Python.

In this first post of a two-part series, we’ll take a quick dive into the RAPIDS project and explore a series of options to make data from Cassandra available for analysis with RAPIDS. Ultimately we’ll describe our current approach: parsing SSTable files in C++ and converting them into a GPU-friendly format, making the data easier to load into GPU device memory. 

If you want to skip the step-by-step journey and try out sstable-to-arrow now, check out the second post.

What is RAPIDS?

RAPIDS is a suite of open source libraries for doing analytics and data science end-to-end on a GPU. It emerged from CUDA, a developer toolkit developed by Nvidia to empower developers to take advantage of their GPUs. RAPIDS takes common AI / ML APIs like pandas and scikit-learn and makes them available for GPU acceleration. Data science, and particularly machine learning, uses a lot of parallel calculations, which makes it better-suited to run on a GPU, which can “multitask” at a few orders of magnitude higher than current CPUs (image from rapids.ai):

Once we get the data on the GPU in the form of a cuDF (essentially the RAPIDS equivalent of a pandas DataFrame), we can interact with it using an almost identical API to the Python libraries you might be familiar with, such as pandas, scikit-learn, and more, as shown in the images from RAPIDS below:

 

Note the use of Apache Arrow as the underlying memory format. Arrow is based on columns rather than rows, allowing faster analytic queries. It also comes with an inter-process communication (IPC) mechanism used to transfer an Arrow record batch (i.e. a table) between processes. The IPC format is identical to the in-memory format, which eliminates any extra copying or deserialization costs and gets us some extremely fast data access.

The benefits of running analytics on a GPU are clear. All you need is the proper hardware, and you can migrate existing data science code to run on the GPU simply by finding-and-replacing the names of Python data science libraries with their RAPIDS equivalents.

So how do we get Cassandra data onto the GPU?

Over the past few weeks, I've been taking a look at five different approaches, listed in order of increasing complexity below:

1. Fetch the data using the Cassandra driver, convert it into a pandas DataFrame, and then turn it into a cuDF.

2. Same as the above, but skip the pandas step and transform data from the driver directly into an Arrow table.

3. Read SSTables from the disk using Cassandra server code, serialize it using the Arrow IPC stream format, and send it to the client.

4. Same as approach 3, but use our own parsing implementation in C++ instead of using Cassandra code.

5. Same as approach 4, but use GPU vectorization with CUDA while parsing the SSTables.

First, I'll give a brief overview of each of these approaches, and then go through a comparison at the end and explain our next steps.

1. Fetch data using the Cassandra driver

This approach is quite simple, because you can use existing libraries without having to do too much hacking. We grab the data from the driver, setting session.row_factory to our pandas_factory function to tell the driver how to transform the incoming data into a pandas.DataFrame. Then, it’s a simple matter to call the cudf.DataFrame.from_pandas function to load our data onto the GPU, where we can then use the RAPIDS libraries to run GPU-accelerated analytics.

The following code requires you to have access to a running Cassandra cluster. See the DataStax Python Driver docs for more info. You will also need to install the required Python libraries with Conda:

Bash
      
conda install -c blazingsql -c rapidsai -c nvidia -c conda-forge -c defaults blazingsql cudf pyarrow pandas numpy cassandra-driver
      
    
Python
      
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

import pandas as pd
import pyarrow as pa
import cudf
from blazingsql import BlazingContext

import config

# connect to the Cassandra server in the cloud and configure the session settings
cloud_config= {
        'secure_connect_bundle': '/path/to/secure/connect/bundle.zip'
}
auth_provider = PlainTextAuthProvider(user=’your_username_here’, password=’your_password_here’)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

def pandas_factory(colnames, rows):
    """Read the data returned by the driver into a pandas DataFrame"""
    return pd.DataFrame(rows, columns=colnames)
session.row_factory = pandas_factory

# run the CQL query and get the data
result_set = session.execute("select * from your_keyspace.your_table_name limit 100;")
df = result_set._current_rows # a pandas dataframe with the information
gpu_df = cudf.DataFrame.from_pandas(df) # transform it into memory on the GPU

# do GPU-accelerated operations, such as SQL queries with blazingsql
bc = BlazingContext()
bc.create_table("gpu_table", gpu_df)
bc.describe_table("gpu_table")
result = bc.sql("SELECT * FROM gpu_table")
print(result)
      
    

2. Fetch data using the Cassandra driver directly into Arrow

This step is identical to the previous one, except we can switch out pandas_factory with the following arrow_factory:

Python
      
def get_col(col):
    rtn = pa.array(col) # automatically detects the type of the array

    # for a full implementation, we'd need to fully check which arrow types need
    # to be manually casted for compatibility with cudf
    if pa.types.is_decimal(rtn.type):
        return rtn.cast('float32')
    return rtn

def arrow_factory(colnames, rows):
    # convert from the row format passed by
    # CQL into the column format of arrow
    cols = [get_col(col) for col in zip(*rows)]
    table = pa.table({ colnames[i]: cols[i] for i in range(len(colnames)) })
    return table

session.row_factory = arrow_factory
      
    

We can then fetch the data and create the cuDF in the same way.

However, both of these two approaches have a major drawback: they rely on querying the existing Cassandra cluster, which we don’t want, because the read-heavy analytics workload might affect the transactional production workload, where real-time performance is key.

Instead, we want to see if there's a way to get the data directly from the SSTable files on the disk without going through the database. This brings us to the next three approaches.

3. Read SSTables from the disk using Cassandra server code

Probably the simplest way to read SSTables on disk is to use the existing Cassandra server technologies, namely SSTableLoader. Once we have a list of partitions from the SSTable, we can manually transform the data from Java objects into Arrow Vectors corresponding to the columns of the table. Then, we can serialize the collection of vectors into the Arrow IPC stream format and then stream it in this format across a socket.

The code here is more complex than the previous two approaches and less developed than the next approach, so I haven’t included it in this post. Another drawback is that although this approach can run in a separate process or machine than the Cassandra cluster, to use SSTableLoader, we first need to initialize embedded Cassandra in the client process, which takes a considerable amount of time on a cold start.

4. Use a custom SSTable parser

To avoid initializing Cassandra, we developed our own custom implementation in C++ for parsing the binary data SSTable files. More information about this approach can be found in the next blog post. Here’s a guide to the Cassandra storage engine by The Last Pickle which helped a lot when deciphering the data format. We decided to use C++ as the language for the parser to anticipate eventually bringing in CUDA and also for low-level control to handle binary data.

5. Integrate CUDA to speed up table reads

We plan to start working on this approach once the custom parsing implementation becomes more comprehensive. Taking advantage of GPU vectorization should greatly speed up the reading and conversion processes.

Comparison

At the current stage, we're mainly concerned with the time it takes to read the SSTable files. For approaches 1 and 2, we can't actually measure this time fairly, because 1) the approach relies on additional hardware (the Cassandra cluster) and 2) there are complex caching effects at play within Cassandra itself. However, for approaches 3 and 4, we can perform simple introspection to track how much time the program takes to read the SSTable file from start to finish.

Here are the results against datasets with 1k, 5k, 10k, 50k, 100k, 500k, and 1m rows of data generated by NoSQLBench:

As the graph shows, the custom implementation is slightly faster than the existing Cassandra implementation, even without any additional optimizations such as multithreading.

Conclusion

Given that data access patterns for analytical use cases usually include large scans and often reading entire tables, the most efficient way to get at this data is not through CQL but by getting at SSTables directly. We were able to implement an sstable parser in C++ that can do this and convert the data to Apache Arrow so that it can be leveraged by analytics libraries including NVIDIA’s GPU powered RAPIDS ecosystem. The resulting open source (Apache 2 licensed) project is called sstable-to-arrow and it is available on GitHub and accessible via Docker Hub as an alpha release.

We will be holding a free online workshop which will go deeper into this project with hands-on examples in mid August! Sign up here if you’re interested.

If you’re interested in trying out sstable-to-arrow, take a look at the second blog post in this two part series and feel free to reach out to seb@datastax.com with any feedback or questions.

Share

One-Stop Data API for Production GenAI

Astra DB gives developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.