TechnologyMarch 1, 2021

Developing High Performance Apache Cassandra® Applications in Rust (Part 1)

Developing High Performance Apache Cassandra® Applications in Rust (Part 1)

Rust has seen huge gains in popularity recently. It joins a C++ level of runtime performance, low resource requirements, and powerful control over details with strong correctness and safety features—a rare mixture not found in most other programming languages. In particular, Rust protects from a wide class of bugs that programmers could make when writing parallel or concurrent code. Concurrent code is hard to get right and can take ages to debug. And massively parallel code is the exact kind of code required to get the most power out of Apache Cassandra.

In this post, I'll explore the options available to develop a Cassandra app in Rust.

Driver Options

Several Rust drivers are currently available to help build Cassandra apps:

Rust bindings to the official DataStax C++ driver: cassandra-rs

Prerequisites

Cassandra achieves the best performance if it has enough work to do. One of the performance antipatterns is sending a query, waiting for the result, then sending another one, waiting again, and all that in a single thread. This isn’t the proper way to take advantage of Cassandra’s massively parallel architecture.

Async driver APIs avoid this antipattern by enabling many queries in parallel, even in a single thread of execution. So here, we’ll focus only on asynchronous code and all the examples assume they are executed in the async context. This section will Briefly explain a few basic steps you need to make to enable async in Rust.

First you need to select an async runtime to use. There are two major, well-developed, popular choices: tokio and async-std. It is up to you which one you choose, but for the purpose of this blog post, we'll use tokio.

Enabling tokio is very easy. Add the following dependency to Cargo.toml:

  • 1
  • 2
[dependencies]
tokio = { version = "1", features = ["full"] }

Next initialize the async context by providing the async main function:

  • 1
  • 2
  • 3
  • 4
#[tokio::main]
async fn main() {
  // all async code goes here.
}

Please refer to the tokio documentation(https://docs.rs/tokio/1.2.0/tokio/) for more details on tuning the schedulers, enabling/disabling features, etc.

DataStax C++ Driver Bindings

This is the most feature-complete and the most battle-tested driver. The bindings cover most of the features available in the official C++ driver from DataStax:

  • Connects to Apache Cassandra, DataStax Enterprise and ScyllaDB
  • CQL protocol versions 3 and 4
  • Asynchronous API
  • Simple, Prepared, and Batch statements
  • Result set paging
  • Asynchronous I/O, parallel execution, and request pipelining
  • Compression
  • Password authentication
  • SSL
  • Connection pooling
  • Automatic node discovery
  • Automatic reconnection with tunable retry policies
  • Idle connection heartbeats
  • Configurable load balancing
  • Latency-aware routing
  • Performance metrics
  • Support for Rust types
  • Tuples and UDTs
  • Collections
  • Client-side timestamps
  • Custom data types
  • Support for materialized view and secondary index metadata
  • Support for clustering key order, frozen<> and Cassandra version metadata
  • Blacklist, whitelist DC, and blacklist DC load balancing policies
  • Reverse DNS with SSL peer identity verification support
  • Randomized contact points
  • Speculative execution

Unfortunately, DSE-specific features like Kerberos, LDAP authentication, geospatial types, and DateRange are not exposed yet. Functions to connect to the DataStax Astra Cloud Data Platform are also not exposed.

Installation

First, you need to install the dependencies of the C++ driver. On Ubuntu 18.04 (Bionic) and newer:

sudo apt-get install libuv1 libuv1-dev libssl1.1 libssl-dev

It is possible to install these dependencies on other distributions, but in some cases you may need to download packages manually and install with dpkg from here

Next, install the C++ driver package:

wget https://downloads.datastax.com/cpp-driver/ubuntu/18.04/cassandra/v2.15.3/cassandra-cpp-driver_2.15.3-1_amd64.deb
wget https://downloads.datastax.com/cpp-driver/ubuntu/18.04/cassandra/v2.15.3/cassandra-cpp-driver-dev_2.15.3-1_amd64.deb
sudo dpkg -i cassandra-cpp-driver_2.15.3-1_amd64.deb cassandra-cpp-driver_2.15.3-1_amd64.deb

Note that you need the -dev dependencies only for compiling the app. Users of your app will need to install only the packages without -dev.

Project Setup

Just add cassandra-cpp and optionally time Cargo.toml:

  • 1
  • 2
  • 3
  • 4
[dependencies]
...
cassandra-cpp = "0.15.1"
time = "0.1.44"

Unfortunately there seems to be no way to use a common async runtime for both the underlying C++ driver and async Rust code. The C++ driver uses libuv internally, but Rust doesn't support using its thread-pool as the async runtime yet. It is also likely not possible to switch the driver to use tokio or async-std thread-pools. Therefore, your app is going to end up with two separate thread-pools—one for the driver and another one for running async tasks in your Rust code. 

It is possible to use the driver without Rust async runtime at all, but in that case, you'd have to convert all async calls into blocking calls by calling wait on the returned futures, which isn't very efficient. In order to achieve good performance this way, you'd have to create many hundreds or even thousands of threads.

Connecting

Configuring the connection is performed through Cluster type:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
use cassandra_cpp::*;


let mut cluster = Cluster::default();


// Specify nodes to contact:
cluster.set_contact_points("host1").unwrap();
cluster.set_contact_points("host2").unwrap();


// Optional authentication:
cluster.set_credentials("user", "password");


// Optional SSL config:
let host1_certificate: String = // load certificate from a file
let host2_certificate: String = // load certificate from a file
let mut ssl = Ssl::default();
ssl.add_trusted_cert(&host1_certificate).unwrap();
ssl.add_trusted_cert(&host2_certificate).unwrap();
cluster.set_ssl(&mut ssl);
    


// (Optional) How long to wait for connecting before bailing out:
cluster.set_connect_timeout(time::Duration::seconds(5));
//  (Optional) How long to wait before attempting to reconnect after connection
//  failure or disconnect:
cluster.set_reconnect_wait_time(100); // 100 ms
//  (Optional) Set load balancing policy:
cluster.set_load_balance_round_robin();

Finally, we can connect and a Session is returned:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
let session = match cluster.connect() {
  Ok(s) => s,
  Err(e) => {
      eprintln!("error: Failed to connect to Cassandra: {}", e);
      exit(1)
  }
}

The returned Session object is Send+Sync, so it can be accessed from multiple threads concurrently.

Querying

Let's assume there is table test in keyspace keyspace1 with the following schema:

CREATE TABLE test(pk BIGINT PRIMARY KEY, data VARCHAR)

In order to be able to quickly query it multiple times, prepare the statement first:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
let read_cql = "SELECT data FROM keyspace1.test WHERE pk = ?";
let prepared: PreparedStatement = session
    .prepare(read_cql)
    .unwrap()
    .await
    .expect("Failed to prepare");

It’s important to prepare each statement only once and store the prepared object(s) somewhere for future use. Preparing a statement requires a roundtrip to the server and takes a significant amount of time. In production code you will probably put all the prepared statements together with the session object (or a reference to it) into a struct passed wherever database access is needed. Prepared statements are also Send+Sync so they can be used from many threads without synchronization!

In order to execute the statement prepared earlier, call bind to get the one-time Statement and set parameters of it. E.g. to print the row with primary key 10:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
let key: i64 = 10;
let mut statement: Statement = prepared.bind();
statement.set_consistency(Consistency::LOCAL_QUORUM);
statement.set_retry_policy(RetryPolicy::downgrading_consistency_new());
statement.bind(0, key).unwrap();


let result = session.execute(&statement).await.expect("Failed to execute");


match result.first_row().unwrap() {
    Some(row) => println!("{}", row.get_column(0).unwrap().get_str().unwrap()),
    None => println!("No row found")
}

In order to receive more than one row, use result.iter():

  • 1
  • 2
  • 3
for row in result.iter() {
    println!("{}", row.get_column(0).unwrap().get_string().unwrap());
}


CDRS

CDRS is a family of Cassandra drivers developed from scratch in Rust.T he original (oldest) version came with a synchronous, blocking API only. Later asynchronous versions using async-std and tokio runtimes were developed as separate projects.

An obvious advantage of a pure-Rust driver is a much simpler installation, both for developers and for application users. No dependency on third-party C++ libraries means the only thing you need is Cargo.

Unfortunately at the moment of writing this blog post, CDRS is looking for maintainers and the last update CDRS-async labels itself as an "alpha version." The following sections will focus on CDRS-tokio, because it offers a performant async API and seems to be well-maintained.

CDRS features include:

  • Connects to Apache Cassandra, DataStax Enterprise and ScyllaDB
  • CQL protocol versions 3 and 4
  • Simple, prepared and batch statements
  • Result set paging
  • Password authentication
  • Pluggable authentication strategies
  • SSL
  • Load balancing
  • Connection pooling
  • Compression: LZ4, Snappy
  • Cassandra-to-Rust data deserialization
  • Server events listening
  • Query tracing information

Project Setup

Add the following dependencies to Cargo.toml:

  • 1
  • 2
  • 3
  • 4
[dependencies]
cdrs-tokio = "2.1.0"
chrono = "0.4"  # to allow representing Cassandra Timestamp by DateTime
tokio = { version = "1", features = ["full"] }

Connecting

In CDRS you first setup connection to each node of the cluster separately, then all of these node configurations together make a cluster configuration allowing you to connect and obtain a Session:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
use cdrs_tokio::authenticators::{StaticPasswordAuthenticator, Authenticator};
use cdrs_tokio::cluster::{NodeTcpConfigBuilder, ClusterTcpConfig, session, TcpConnectionPool};
use cdrs_tokio::load_balancing::RoundRobin;
use cdrs_tokio::cluster::session::Session;
use tokio::time::Duration;


let authenticator = StaticPasswordAuthenticator::new("user", "password");
let node_address = "127.0.0.1:9042";
let node = NodeTcpConfigBuilder::new(node_address, authenticator)
    .max_size(5)
    .min_idle(Some(4))
    .max_lifetime(Some(Duration::from_secs(60)))
    .idle_timeout(Some(Duration::from_secs(60)))
    .build();


let cluster_config = ClusterTcpConfig(vec![node]);
let session = session::new(&cluster_config, RoundRobin::new())
    .await
    .expect("connect");

One problem that's not immediately visible in this code is the type of created session. Let's add an explicit type spec:Some of the configuration like authenticator or load balancing is encoded in the compile-time type of the session. What if we don't want to hard-code an authenticator, but configure it at runtime instead? Authenticator is a trait, so Rust trait-objects could be used.

  • 1
  • 2
  • 3
  • 4
let session: Session<roundrobin<tcpconnectionpool<staticpasswordauthenticator>>> = 
    session::new(&cluster_config, RoundRobin::new())
        .await
        .expect("connect");</roundrobin<tcpconnectionpool<staticpasswordauthenticator>

Unfortunately Authenticator trait is not object-safe, hence the line above does not work:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
error[E0038]: the trait `cdrs_tokio::authenticators::Authenticator` cannot be made into an object
  --> src/cdrs_session.rs:13:24
   |
13 |     let authenticator: Box<dyn authenticator=""> =
   |                        ^^^^^^^^^^^^^^^^^^^^^^ `cdrs_tokio::authenticators::Authenticator` cannot be made into an object
   |
   = note: the trait cannot be made into an object because it requires `Self: Sized`
   = note: for a trait to be "object safe" it needs to allow building a vtable to allow the call to be resolvable dynamically; for more information visit https://doc.rust-lang.org/reference/items/traits.html#object-safety

This means that until the issue is resolved, CDRS is not a good fit in applications that need to configure the database connection in runtime (e.g. based on config files).

Querying

Same as with DataStax driver, in order to achieve best performance, it is recommended to use prepared statements:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
use cdrs_tokio::query::{PreparedQuery, PrepareExecutor};


let statement: PreparedQuery = 
    session
        .prepare("SELECT data FROM keyspace1.test WHERE pk = ?")
        .await
        .expect("prepare query");

To issue a query, just run exec_with_values method and pass a statement reference and parameters to it. Again, it is very important to supply correct integer types matching exactly the schema of your table. Cassandra data types are signed.

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
use cdrs_tokio::query_values;
use cdrs_tokio::types::prelude::Row;


let key = query_values!(10i64);
let result: Vec<row> =
    session.exec_with_values(&statement, key)
        .await
        .expect("execute query")
        .get_body()
        .unwrap()
        .into_rows()
        .unwrap();


for row in result {
    let value: String = row.get_by_index(0).unwrap().unwrap();
    println!("{}", value);
}</row>

The result is just a vector of rows. You can access the column values by calling get_by_index() method, which is capable of automatically converting the column value to common Rust types.

Object Mapper

A nice feature of CDRS is the ability to automatically derive code for deserializing raw Row objects into structures. This is supported by an additional crate:

  • 1
  • 2
  • 3
[dependencies]
...
cdrs-tokio-helpers-derive = "1.1.1"

For example, let's assume the following table for storing users of our application:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
CREATE TABLE users (
    id BIGINT PRIMARY KEY, 
    login VARCHAR, 
    name VARCHAR, 
    emails LIST<varchar>,
    created TIMESTAMP
);</varchar>

In the application, you can map that to a Rust struct:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
// These imports are very important, even if they 
// look as unused. CDRS derive macros use them and the program
// would not compile if these are omitted.
use cdrs_tokio::types::from_cdrs::FromCDRSByName;
use cdrs_tokio::frame::traits::TryFromRow;
use cdrs_tokio_helpers_derive::*;


#[derive(Clone, Debug, TryFromRow, PartialEq)]
struct User {
    id: i64,
    login: String,
    name: String,
    emails: Vec<string>,
    created: DateTime<utc>,
}</utc></string>

Now, instead of unpacking fields from the received rows manually, we can just call the auto-generated try_from_row method to convert the row into an object:

  • 1
  • 2
  • 3
  • 4
for row in result {
    let user: User = User::try_from_row(row).unwrap();
    println!("{:?}", user);
}

Note that the conversion might fail if the database schema doesn't match the struct, so in the production code you need to handle the error value properly instead of just unwrap-ping the result blindly.

ScyllaDB Driver

This is a driver written by the ScyllaDB team from scratch in Rust. It is compatible with recent versions of Apache Cassandra and DataStax Enterprise. At the moment of writing this blog post, the driver is labeled as "in early development and is not for production nor officially supported."

Features include:

  • Asynchronous API based on Tokio
  • CQL protocol version 4
  • Simple, prepared and batch statements
  • Query paging
  • Token-aware routing
  • Shard-aware routing (specific to ScyllaDB)
  • Compression (LZ4 and Snappy algorithms)
  • Configurable load balancing policies
  • Driver-side metrics

Some important features like authentication and SSL are under development.

Project Setup

Similar to CDRS-tokio, this driver requires only a dependency entry in Cargo.toml, however the project hasn't been published to crates.io yet, so a github link must be given:

  • 1
  • 2
  • 3
[dependencies]
scylla = { git = "https://github.com/scylladb/scylla-rust-driver", branch = "main"}
scylla-macros = { git = "https://github.com/scylladb/scylla-rust-driver", branch = "main"}


Connecting

The connection ceremony in ScyllaDB driver is kept to the absolute minimum. There is just one builder object – SessionBuilder responsible for configuring and obtaining the Session:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
use scylla::{SessionBuilder, Session};

let node = "127.0.0.1:9042";
let session: Session = SessionBuilder::new()
    .known_node(node)
    .build()
    .await
    .expect("connect");

There isn't much configuration available yet, however one of the things you can already set is load balancing policy. And this time, contrary to CDRS, it is properly dynamic so you can select it at runtime!

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
use scylla::transport::load_balancing::{
    RoundRobinPolicy, LoadBalancingPolicy, DCAwareRoundRobinPolicy};


// In production code, you could load it from a config file:
let local_dc = Some(String::from("DC1"));  


let lb_policy: Box<dyn loadbalancingpolicy="">> =
    match local_dc {
        Some(dc) => Box::new(DCAwareRoundRobinPolicy::new(dc)),
        None => Box::new(RoundRobinPolicy::new())
    };


let session: Session = SessionBuilder::new()
    .known_node(node)
    .load_balancing(lb_policy)
    .build()
    .await
    .expect("connect");
</dyn>


Querying

Querying isn't much different from the other drivers. Prepare the statement, execute it with parameters, and get the result as a vector of Rows. Query parameters can be given as a tuple or vector, but tuples are preferred because they  accept values differing in types, while vectors do not. Each returned Row contains a vector of CQLValues which can be easily converted to a desired Rust type by calling one of the provided as_ or into_ methods. Unfortunately the number of supported types is limited only to integers, strings, internet addresses, lists and sets. Maps, dates, timestamps, uuids, etc. are on the TODO list.

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
let statement= session
    .prepare("SELECT data FROM keyspace1.test WHERE pk = ?")
    .await
    .expect("prepare query");


let key: i64 = 10;
let result = session
    // pass parameters to the statement as a tuple
    // the comma after key looks weird, but is is actually essential to make a tuple
    // instead of a single int value
    .execute(&statement, (key,))
    .await
    .expect("execute query");


if let Some(rows) = result {
    for row in rows {
        if !row.columns.is_empty() {
            let data = row.columns[0].as_ref().unwrap().as_text().unwrap();
            println!("{}", data);
        }
    }
}


Object Mapper

ScyllaDB driver offers derive macros for automatic conversion of a Row into a user-defined Rust struct. Annotate your struct with #[derive(FromRow)] and call into_typed on returned rows:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
use scylla::cql_to_rust::FromRow;
use scylla::macros::FromRow;

#[derive(Debug, FromRow)]
struct User {
    id: i64,
    login: String,
    name: String,
    emails: Vec<string>,
}

let statement = session
    .prepare("SELECT id, login, name, emails FROM keyspace1.users WHERE id = ?")
    .await
    .expect("prepare query");

let key: i64 = 10;
let result = session
    .execute(&statement, (key,))
    .await
    .expect("execute query");

if let Some(rows) = result {
    for row in rows {
        let user: User = row.into_typed().expect("deserialize User");
        println!("{:?}", user);
    }
}</string>

If you don't want to define a struct in some cases, into_typed can also deserialize to tuples, which is handy for receiving simple results:

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
if let Some(rows) = result {
    for row in rows {
        let user: (i64, String, String, Vec<string>) = 
            row.into_typed().expect("deserialize User");
        println!("{:?}", user);
    }
}</string>

Summary

We presented three different drivers. At the moment DataStax C++ Driver with Rust bindings is the most stable, mature, and feature-rich option, even though due to its C++ origin, it is a bit more complex to install than the other two options. For now, this is our only recommendation for production-grade systems.

CDRS is the most feature-rich among the pure-Rust drivers, is easy to install, and comes in three variants: blocking, async-std, tokio. Unfortunately it has the most verbose API and has a few rough edges I've run into while writing this post. We hope the situation gets better as Rust gains popularity.

ScyllaDB driver is the new hotness, and although at present it is a bit limited in features, it shows a great potential. I like the general feel of the API and its simplicity. 

Stay tuned for Part 2, where I will cover parallelism and performance!

One-Stop Data API for Production GenAI

Astra DB gives developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.