TechnologyDecember 19, 2019

DataStax Bulk Loader Part 6 - Examples for Loading From Other Locations

DataStax Bulk Loader Part 6 - Examples for Loading From Other Locations

Earlier this week, we made both the DataStax Bulk Loader and DataStax Apache Kafka® Connector tools freely available to any developer working with open source Cassandra. What better time to conclude our DataStax Bulk Loader blog series, with a look at examples for loading from other locations. 

There are a number of cases where we want to load data from other locations into DataStax Enterprise (DSE), including:

Example 30: Load from another DSE cluster

One common use case is moving data from one DSE cluster to another. dsbulk does not have a “migrate” method, but we can accomplish this with native load and unload support. One way to do this is to unload data from one cluster to the local file system and then load that data into the other cluster:
$ dsbulk unload -h localhost -k dsbulkblog -t iris_with_id -url /tmp/dsbulkblog/migrate
$ dsbulk load -h localhost -k dsbulkblog -t iris_with_id -url /tmp/dsbulkblog/migrate

One nifty way to do this is in one go, piping the output of dsbulk unload to dsbulk load:
$ dsbulk unload -h localhost -k dsbulkblog -t iris_with_id | dsbulk load -h localhost -k dsbulkblog -t iris_with_id

This gives the following (interleaved) output:
Operation directory: /tmp/logs/LOAD_20190314-172058-173447.
Operation directory: /tmp/logs/UNLOAD_20190314-172058-192179.
total | failed | rows/s | mb/s | kb/row | p50 ms | p99ms | p999ms
  150 |      0 |    302 | 0.01 |   0.04 |  20.12 | 20.19 |  20.19
Operation UNLOAD_20190314-172058-192179 completed successfully in 0 seconds.
total | failed | rows/s | mb/s | kb/row | p50 ms | p99ms | p999ms | batches
  150 |      0 |     52 | 0.00 |   0.04 |  20.94 | 60.29 |  61.34 |    1.00
Operation LOAD_20190314-172058-173447 completed successfully in 2 seconds.
Last processed positions can be found in positions.txt

While this second way is nifty, it does have one downside that we should call out. The unload and load will both be single-threaded since they are going via stdout/stdin. Doing things in two steps is safer and will be done in parallel, but you will do all of your unloading before loading.

Example 30.1: Migrating while preserving TTLs and Timestamps

Sometimes we want to migrate the data while preserving the writetime and the TTL. We can do that via custom CQL queries with dsbulk. First, we unload the data with the writetime and TTL information:

$ dsbulk unload -h localhost -query "SELECT id, petal_length, WRITETIME(petal_length) AS w_petal_length, TTL(petal_length) AS l_petal_length, petal_width, WRITETIME(petal_width) AS w_petal_width, TTL(petal_width) AS l_petal_width, sepal_length, WRITETIME(sepal_length) AS w_sepal_length, TTL(sepal_length) AS l_sepal_length, sepal_width, WRITETIME(sepal_width) AS w_sepal_width, TTL(sepal_width) AS l_sepal_width, species, WRITETIME(species) AS w_species, TTL(species) AS l_species FROM dsbulkblog.iris_with_id" -url /tmp/dsbulkblog/migrate

The next thing to do is to load. We’re going to accomplish this by using batch statements. The reason is that we can only set one writetime and one TTL per INSERT statement. So, we will do one INSERT statement per regular column (petal_length, petal_width, sepal_length, sepal_width, and species). We will put all of these INSERTs into the same batch all keyed on the same partition key, namely id, so these will be collapsed in Cassandra into a single mutation and applied atomically:

$ dsbulk load -h localhost -query "BEGIN BATCH INSERT INTO dsbulkblog.iris_with_id(id, petal_length) VALUES (:id, :petal_length)
USING TIMESTAMP :w_petal_length AND TTL :l_petal_length; INSERT INTO dsbulkblog.iris_with_id(id, petal_width) VALUES (:id, :petal_width)
USING TIMESTAMP :w_petal_width AND TTL :l_petal_width; INSERT INTO dsbulkblog.iris_with_id(id, sepal_length) VALUES (:id, :sepal_length)
USING TIMESTAMP :w_sepal_length AND TTL :l_sepal_length; INSERT INTO dsbulkblog.iris_with_id(id, sepal_width) VALUES (:id, :sepal_width)
USING TIMESTAMP :w_sepal_width AND TTL :l_sepal_width; INSERT INTO dsbulkblog.iris_with_id(id, species) VALUES (:id, :species)
USING TIMESTAMP :w_species AND TTL :l_species; APPLY BATCH;" -url /tmp/dsbulkblog/migrate --batch.mode DISABLED

Example 30.2: Migrating within the same cluster

The above examples can be used when the source and destination clusters are the same. One good example is if you need to change the schema of the table and it already has data in it. While adding (or removing) a regular column is simple and supported in CQL, changing the primary keys (partition keys or clustering columns) is not supported. To accomplish that, you need to copy the data, something that is also not supported in CQL.

For example, let’s take our iris data and change the schema to make the species the partition key and the ID column the clustering column. We can do that via CQL:
$ cqlsh -e "CREATE TABLE dsbulkblog.iris_by_species(id INT, petal_length DOUBLE, petal_width DOUBLE, sepal_length DOUBLE, sepal_width DOUBLE, species TEXT, PRIMARY KEY ((species), id));"

Now, we can unload the data from the iris_with_id table and load that data into iris_by_species table:
$ dsbulk unload -h localhost -k dsbulkblog -t iris_with_id | dsbulk load -h localhost -k dsbulkblog -t iris_by_species

We can spot-check that the data has migrated by comparing the results of a dsbulk count:
$ dsbulk count -k dsbulkblog -t iris_with_id -stats global --log.verbosity 0
150
$ dsbulk count -k dsbulkblog -t iris_by_species -stats global --log.verbosity 0
150

Example 31: Loading from DSEFS

One common use case to consider is how to load data from DSEFS. dsbulk does not have native support for loading from DSEFS URLs.  That said, there are a couple of ways we can address this.  To demonstrate this, on a cluster with Analtyics enabled (and thus DSEFS enabled) let’s first put some data in DSEFS:
$ dse fs "mkdir dsefs:///dsbulkblog"
$ dse fs "cp file:///tmp/dsbulkblog/iris.csv dsefs:///dsbulkblog/"

We can see that the data is now in there with:
$ dse fs "cat dsefs:///dsbulkblog/iris.csv"

Now, we can use that same command and pipe it into dsbulk:
$ dse fs "cat dsefs:///dsbulkblog/iris.csv" | dsbulk load -k dsbulkblog -t iris_with_id

We could also do this with the WebHDFS interface to DSEFS:
$ dsbulk load -k dsbulkblog -t iris_with_id -url  http://localhost:5598/webhdfs/v1/dsbulkblog/iris.csv?op=OPEN

Example 32: Loading from MySQL

In a similar fashion, we can migrate data from an RDBMS to DSE by combining the command-line utility of the RDBMS to write data to its stdout and then pipe that into dsbulk.  For MySQL, for example, it would look something like this (Note: MySQL will export in tab-delimited form):
$ mysql --user=mysqluser --password=mypassword -B --execute="SELECT id, petal_length, petal_width, sepal_length, sepal_width, species FROM mydb.iris_with_id" | dsbulk load -k dsbulkblog -t iris_with_id -delim "\t" -header false -m "id,petal_length,petal_width,sepal_length,sepal_width,species"

Wrap-up

As part of our ongoing support of the Cassandra community, we were very excited to make our DataStax Apache Kafka™ Connector and DataStax Bulk Loader for Cassandra announcement. Read about the free tools in this blog post.

If you want to learn more about DataStax Bulk Loader, here are the previous blog posts in this series: 

One-Stop Data API for Production GenAI

Astra DB gives developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.