Using the Cassandra Bulk Loader
Bulk loading data in Cassandra has historically been difficult. Although Cassandra has had the BinaryMemtable interface from the very beginning, BinaryMemtable is hard to use and provides a relatively minor throughput improvement over normal client writes.
Cassandra 0.8.1 introduces a new tool to solve this problem: sstableloader
Using sstableloader
For the most up-to-date information, see the DataStax Community Documentation.
Overview
sstableloader is a tool that, given a set of sstable data files, streams them to a live cluster. It does not simply copy the set of sstables to every node, but only transfers the relevant part of the data to each, conforming to the replication strategy of the cluster.
There are two primary use cases for this new tool:
- Bulk loading external data into a cluster: for this you will have to first generate sstables for the data to load, as we will see later in this post.
- Loading pre-existing sstables, typically snapshots, into another cluster with different node counts or replication strategy.
Example
Let us start with the second use case to demonstrate how sstableloader is used. For that, consider the following scenario: you have a one node test cluster populated with data that you want to transfer into another, multi-node cluster.
A brute-force solution would be to copy all the sstables of the source node to every node in the multi-node destination cluster, restart each node, and then run nodetool cleanup on them. This works, but is obviously inefficient, especially if the destination cluster has a lot of nodes.
With sstableloader, you first need the sstables to be in a directory whose name is the name of the keyspace of the sstables. This is how they will be stored in either the main data directory, or a snapshot. Then, assuming sstableloader is configured to talk to your multi-node cluster:
$ ls TestKeyspace/
TestCF-g-1-Data.db TestCF-g-2-Data.db TestCF-g-3-Data.db
TestCF-g-1-Index.db TestCF-g-2-Index.db TestCF-g-3-Index.db
$ sstableloader TestKeyspace
Starting client (and waiting 30 seconds for gossip) ...
Streaming revelant part of testKeyspace/TestCF-g-1-Data.db TestKeyspace/TestCF-g-2-Data.db TestKeyspace/TestCF-g-3-Data.db to [/127.0.0.1, /127.0.0.2, /127.0.0.3]
progress: [/127.0.0.1 3/3 (100)] [/127.0.0.2 3/3 (100)] [/127.0.0.3 3/3 (100)] [total: 100 - 24MB/s (avg: 18MB/s)]
Waiting for targets to rebuild indexes ...
Configuration
To learn the topology of the cluster, the number of nodes, which ranges of keys each node is responsible for, the schema, etc., sstableloader uses the Cassandra gossip subsystem. It thus requires a directory containing a cassandra.yaml configuration file in the classpath. (If you use sstableloader from the Cassandra source tree, the cassandra.yaml file in conf will be used.)
In this config file, the listen_address, storage_port, rpc_address and rpc_port should be set correctly to communicate with the cluster, and at least one node of the cluster you want to load data in should be configured as seed. The rest is ignored for the purposes of sstableloader.
Because the sstableloader uses gossip to communicate with other nodes, if launched on the same machine that a given Cassandra node, it will need to use a different network interface than the Cassandra node. But if you want to load data from a Cassandra node, there is a simpler solution: you can use the JMX->StorageService->bulkload() call from said node.
This method simply takes the absolute path to the directory where the sstables to load are, and it will load those as sstableloader would. However, since the node running sstableloader will be both source and destination for the streaming, this will put more load on that particular node, so we advise loading data from machines that are not Cassandra nodes when loading into a live cluster.
Note that the schema for the column families to be loaded should be defined beforehand, using you prefered method: CLI, thrift or CQL.
Other considerations
- There is no requirement that the column family into which which data is loaded be empty. More generally, it is perfectly reasonable to load data into a live, active cluster.
- To get the best throughput out of the sstable loading, you will want to parallelize the creation of sstables to stream across multiple machines. There is no hard limit on the number of sstable loader that can run at the same time, so you can add additional loaders until you see no further improvement.
- At the time of this writing, sstableloader does not handle failure very well. In particular, if a node it is sending to dies, it will get stuck (a progress indicator is displayed so you will be able to tell when that happens and check if one of your node is indeed dead). Until this is fixed, if that happens, you will have to stop the loader and relaunch it. If you know that the transfer has successfully ended on some of the other nodes, you can use the -i flag to skip those nodes during the retry.
Bulk-loading external data: a complete example
The setup
If you want to bulk-load external data that is not in sstable form using sstableloader, you will have to first generate sstables. To do so, the simplest solution is the new Java class SSTableSimpleUnsortedWriter introduced in Cassandra 0.8.2. To demonstrate how it is used, let us consider the example of bulk-loading "user profile" data from a csv file. More precisely, we consider a csv file of the following form:
# uuid, firstname, lastname, password, age, email
5bd8c586-ae44-11e0-97b8-0026b0ea8cd0, Alice, Smith, asmi1975, 32, alice.smith@mail.com
4bd8cb58-ae44-12e0-a2b8-0026b0ed9cd1, Bob, Miller, af3!df8, 28, bob.miller@mail.com
1ce7cb58-ae44-12e0-a2b8-0026b0ad21ab, Carol, White, cw1845?, 49, c.white@mail.com
...
From this csv, we want to populate two column families that can have been created (using the CLI) with:
create keyspace Demo;
use Demo;
create column family Users
with key_validation_class=LexicalUUIDType
and comparator=AsciiType
and column_metadata=[
{ column_name: 'firstname', validation_class: AsciiType }
{ column_name: 'lastname', validation_class: AsciiType }
{ column_name: 'password', validation_class: AsciiType }
{ column_name: 'age', validation_class: LongType, index_type: KEYS }
{ column_name: 'email', validation_class: AsciiType }];
create column family Logins
with key_validation_class=AsciiType
and comparator=AsciiType
and column_metadata=[
{ column_name: 'password', validation_class: AsciiType },
{ column_name: 'uuid', validation_class: LexicalUUIDType }];
In other words, the column family Users will contain user profiles: the key is a uuid identifying the user, the columns are the user properties. We also added a secondary index on the 'age' property, mainly to show that this is supported by the bulk-loading process.
The second column family, Logins, associates the user email (note that this example assumes that user emails are unique) to its password and identifier. It is this column family that would typically be queried when a user login to check its credentials and allow to find its identifier to retrieve the profile data (a possibly simpler/better design would be to use a secondary index on the email column on Users. We don't do this here to show how to load multiple column families together).
Creating sstables
A complete Java example of how to create the relevant sstables from the csv file using the SSTableSimpleUnsortedWriter class can be found here.
To compile this file the Cassandra jar (>= 0.8.2) needs to be in the classpath (javac -cp <path_to>/apache-cassandra-0.8.2.jar DataImportExample.java). To run it, the Cassandra jar needs to be present as well as the jar of the librairies used by Cassandra (those in the lib/ directory of Cassandra source tree). Valid cassandra.yaml and log4j configuration files should also be accessible; typically, this means the conf/ directory of the Cassandra source tree should be in the classpath--see here for a typical launch script that sets all those. As of 0.8.2, you will need to set the data_file_directories and commitlog_directory directives in said cassandra.yaml to accessible directories, but not ones of an existing Cassandra node. (This will be fixed in 0.8.3, but in the meantime using /tmp for both is a good idea.) The only useful property you need to set up for SSTableSimpleUnsortedWriter is the partitioner you want to use.
Let us run through the important parts of this example:
- Creation of the sstable writers:
SSTableSimpleUnsortedWriter usersWriter = new SSTableSimpleUnsortedWriter(
directory,
keyspace,
"Users",
AsciiType.instance,
null,
64);
SSTableSimpleUnsortedWriter loginWriter = new SSTableSimpleUnsortedWriter(
directory,
keyspace,
"Logins",
AsciiType.instance,
null,
64);
The directory and keyspace parameters are the directory where to put the sstables (a Java File) and the keyspace of the column families (a String), respectively. Next, there are the column family name and the comparator and sub-columns comparator--here, we don't use super columns so the sub-columns comparator is null.The last parameter is a "buffer" size: sstables need to have rows sorted according to the partitioner. For RandomPartitioner, this means that row should be ordered by the MD5 of their key. Since there is no chance data will come in that order, SSTableSimpleUnsortedWriter buffers whatever input it gets in memory and "flush" everything in one sstable once the buffer is full. The buffer size is in MB (here 64MB) and actually corresponds to serialized space. That is, the resulting sstables will be approximately 64MB size. Note that the "live" size on the Java heap can be larger, so setting this parameter too large is not advisable, and in any case there is little performance advantage to use a very high value.
- Populate with each csv entry:
for (...each csv entry...)
{
ByteBuffer uuid = ByteBuffer.wrap(decompose(entry.key));
usersWriter.newRow(uuid);
usersWriter.addColumn(bytes("firstname"), bytes(entry.firstname), timestamp);
usersWriter.addColumn(bytes("lastname"), bytes(entry.lastname), timestamp);
usersWriter.addColumn(bytes("passsword"), bytes(entry.password), timestamp);
usersWriter.addColumn(bytes("age"), bytes(entry.age), timestamp);
usersWriter.addColumn(bytes("email"), bytes(entry.email), timestamp);
loginWriter.newRow(bytes(entry.email));
loginWriter.addColumn(bytes("password"), bytes(entry.password), timestamp);
loginWriter.addColumn(bytes("uuid"), uuid, timestamp);<
}
usersWriter.close();
loginWriter.close();
In this excerpt, entry is a parsed csv entry. Each call to newRow() starts a new row that is populated with the column added by addColumn(). Though not demonstrated here, it is equally simple to add super, expiring or counter columns; the exact API is described here.Note that the order of additions of rows and of columns inside rows does not matter. It is also possible to "restart" a row multiple times or to add the same column multiple times, in which case the usual conflict resolution rules between columns apply.
Finally, each writer should be closed, otherwise the resulting sstables will not be complete.
Once compiled and run with a csv file as argument, this example program will create sstables in the Demo directory. Those sstables can then be loaded into a live cluster using sstableloader as described in the previous section: sstableloader Demo/.
Other considerations
- SSTableSimpleUnsortedWriter never flushes to disk between two calls of newRow(). As a consequence, all data inserted between two of those calls must fit in memory. If you have a huge row for which this does not hold, you can call newRow() regularly, using the same row key, to avoid buffering everything.
- The methods of the simple writer expect ByteBuffers for the row key, column name and column value. Converting data to bytes is your responsibility; this is the raison d'être of the bytes() method in the example above.