TechnologyApril 9, 2019

DataStax Bulk Loader Pt. 2 — More Loading

DataStax Bulk Loader Pt. 2 — More Loading

In the last blog post, we introduced the dsbulk command, some basic loading examples, and dove into some mappings. In this blog post, we are going to look into some additional elements for data loading.


Example 5: Comments

dsbulk allows for you to have comments in your input file.  By default there is no comment character. The iris_with_comment.csv file has a comment at the beginning. We want to ignore this line, so we call dsbulk like:

$ dsbulk load -url /tmp/dsbulkblog/iris_with_comment.csv -k dsbulkblog -t iris_with_id -comment "#"

Example 6: Nulls

Sometimes our data has nulls in it. By default, empty strings will insert into DSE as NULL values. The iris_with_null.csv file has empty strings for the species. We can load this using the default settings. First, let us truncate the table:

$ cqlsh -e "TRUNCATE dsbulkblog.iris_with_id;"

Now, let’s load data:

$ dsbulk load -url /tmp/dsbulkblog/iris_with_nulls.csv -k dsbulkblog -t iris_with_id

To see that we have NULL for the species, we can do a quick SELECT on the iris_with_id table:

$ cqlsh -e "SELECT * FROM dsbulkblog.iris_with_id LIMIT 3;"
id  | petal_length | petal_width | sepal_length | sepal_width | species
-----+--------------+-------------+--------------+-------------+---------  
23 |          1.7 | 0.5 |          5.1 | 3.3 | null
114|          5.1 | 2.4 |          5.8 | 2.8 | null  
53 |            4 | 1.3 |          5.5 | 2.3 | null

(3 rows)

We can actually choose to convert those empty values into non-NULL values by specifying the --connector.csv.nullValue parameter. For example, if we have an empty species we could set the value to IRIS:

$ cqlsh -e "TRUNCATE dsbulkblog.iris_with_id;"
$ dsbulk load -url /tmp/dsbulkblog/iris_with_nulls.csv -k dsbulk
blog -t iris_with_id --connector.csv.nullValue "IRIS"

Operation directory: /tmp/logs/LOAD_20190320-190450-807527

total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches 
150 |      0 | 346 | 0.01 |   0.04 | 11.49 | 39.58 |  41.42 | 1.00

Operation LOAD_20190320-190450-807527 completed successfully in 0 seconds.
Last processed positions can be found in positions.txt $
cqlsh -e "SELECT * FROM dsbulkblog.iris_with_id LIMIT 3;"

id  | petal_length | petal_width | sepal_length | sepal_width | species

-----+--------------+-------------+--------------+-------------+---------  

23 |          1.7 | 0.5 |          5.1 | 3.3 | IRIS
114|          5.1 | 2.4 |          5.8 | 2.8 | IRIS 
53 |            4 | 1.3 |          5.5 | 2.3 | IRIS

(3 rows)

Example 6.1: Null strings

Of course, sometimes the data comes with a string used for NULL, such as in the iris_with_null_string.csv file, which uses the string NULL for a NULL value. If we load that file with the defaults, like this:

$ dsbulk load -url /tmp/dsbulkblog/iris_with_null_string.csv -k dsbulkblog -t iris_with_id

We would not get what we expect:

$ cqlsh -e "SELECT * FROM dsbulkblog.iris_with_id LIMIT 3;"

id  | petal_length | petal_width | sepal_length | sepal_width | species

-----+--------------+-------------+--------------+-------------+---------  

23 |          1.7 | 0.5 |          5.1 | 3.3 | NULL
114|          5.1 | 2.4 |          5.8 | 2.8 | NULL 

53 |            4 | 1.3 |          5.5 | 2.3 | NULL (3 rows)

Notice that we have the string “NULL”, not the NULL value.

We can load that with nulls by using the -nullStrings parameter. First, let us truncate the table:

$ cqlsh -e "TRUNCATE dsbulkblog.iris_with_id;"

Now, let’s use the -nullStrings parameter:

$ dsbulk load -url /tmp/dsbulkblog/iris_with_null_string.csv -k dsbulkblog -t iris_with_id -nullStrings "NULL"

Which would get us what we expect:

$ cqlsh -e "SELECT * FROM dsbulkblog.iris_with_id LIMIT 3;"

id  | petal_length | petal_width | sepal_length | sepal_width | species

-----+--------------+-------------+--------------+-------------+---------  

23 |          1.7 | 0.5 |          5.1 | 3.3 | null
114|          5.1 | 2.4 |          5.8 | 2.8 | null 

53 |            4 | 1.3 |          5.5 | 2.3 | null (3 rows)

Notice that we now have the NULL value, not the string “NULL”.  

Example 6.2: Unset versus NULL

In DSE 5.0 (or Cassandra 3.0), we introduced the ability to specify the field of an INSERT as “unset”.  Prior to this, we would have to specify a missing value as a NULL, which would do two things:

  1. In the case where this INSERT is an UPDATE or overwrite, we would be deleting any data that was in those fields.
  2. In the case where this is an INSERT of a new record, we would actually be inserting a NULL, which DSE will consider a tombstone, which may become problematic.

dsbulk can treat input null data as either NULL or as unset.  The default is to treat them as unset, but we can specify that we want them to be loaded as nulls with the --schema.nullToUnset parameter:

$ dsbulk load -url /tmp/dsbulkblog/iris_with_null_string.csv -k dsbulkblog -t iris_with_id
-nullStrings "NULL" --schema.nullToUnset false

Note that despite the fact that dsbulk only supports DSE 5.0 and later, some users have loaded to DSE 4.8.  To do this, you have to set --schema.nullToUnset false explicitly since versions of DSE prior to DSE 5.0 did not support unset.

Example 7: Delimiters

Not all data will be delimited by commas, but that is the default delimiter for dsbulk.  For example, the president_birthdates.psv file is delimited by pipes.  We can load that via:

$ dsbulk load -url /tmp/dsbulkblog/president_birthdates.psv -k dsbulkblog -t president_birthdates -delim "|"

The -delim parameter is a shortcut for --connector.csv.delimiter.

Example 8: Date Formats

You may notice that one of the records failed to load. We can see that by looking in the mapping-errors.log:

Resource: file:/tmp/dsbulkblog/president_birthdates.psv
Position: 6

Source: John Quincy Adams|July 11, 1767|1767.07.11\u000a
java.time.format.DateTimeParseException: Text '1767.07.11' could not be parsed at index 4       
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)        
      at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1819)        
      at com.datastax.dsbulk.engine.internal.codecs.util.SimpleTemporalFormat.parse(SimpleTemporalFormat.java:41)       
at com.datastax.dsbulk.engine.internal.codecs.string.StringToTemporalCodec.parseTemporalAccessor(StringToTemporalCodec.java:39)       
at com.datastax.dsbulk.engine.internal.codecs.string.StringToLocalDateCodec.externalToInternal(StringToLocalDateCodec.java:30)

This is because for John Quincy Adams we used a different date format. As we discussed before, the bad line is stored in the mapping.bad file. We can load this file with a different date format, like:

$ dsbulk load -url /tmp/logs/LOAD_20190314-164545-280529/mapping.bad -k dsbulkblog -t president_birthdates -delim "|" -header false -m "president,birthdate_string,birthdate" --codec.date "y.M.d"

Note a few things here.  First, the location of the mapping.bad file is specific to this run.  It will likely have a different path, and you would see this on your failed run that produced this “bad file”.  Second, we need to specify the correct date format. We do this by creating a string per the Java DateFormatter, and pass that in using the --codec.date parameter.  Third, the bad file does not have a header, so we need to provide it, which requires 2 things:

  1. We need to tell dsbulk that there is no header: -header false
  2. We need to provide the mapping, which in this case would be: -m "president,birthdate_string,birthdate"

As an aside, we could get fancy with Linux things for the mapping and grab the header line from the original president_birthdates.psv file (though we need to replace the “|” with “,”):

$ dsbulk load -url /tmp/logs/LOAD_20190314-164545-280529/mapping.bad -k dsbulkblog -t president_birthdates -delim "|" -header false -m `head -1 /tmp/dsbulkblog/president_birthdates.psv | sed 's/|/,/g'` --codec.date "y.M.d"

Example 9: CurrentDate(), Now(), etc

Sometimes we wish to populate the data with the current date or timestamp. CQL allows you to call the CurrentDate() function (as well as Now(), CurrentTimestamp(), etc). while doing an INSERT. If we wanted to do this, we could do it by using the -query command. For example, if we wanted to set the birthdate of all presidents to today (just as a contrived example), we could do so via:

$ dsbulk load -url /tmp/dsbulkblog/president_birthdates.psv -delim "|" -query "INSERT INTO dsbulkblog.president_birthdates(president,birthdate,birthdate_string) VALUES (:president, CurrentDate(), :birthdate_string)"

Example 8.1: Using mappings for CurrentDate(), Now(), etc

We can do this using the --schema.mapping, as well:

$ dsbulk load -url /tmp/dsbulkblog/president_birthdates.psv -k dsbulkblog -t president_birthdates -delim "|" -m "president=president, CurrentDate()=birthdate, birthdate_string=birthdate_string"

Example 10: Collections

dsbulk can load data into collection and columns with user-defined types (UDTs). It should be noted that dsbulk replaces the existing collection, and does not add to the existing collection. The format for data in collections is as JSON. For example, this is the first data line in sportsteams.csv:

"Baseball Teams","[\"Boston Red Sox\", \"Atlanta Braves\", \"San Diego Padres\", \"Washington Senators\"]"

We do not need to do anything in particular to load this into dsbulkblog.categories_list:

$ dsbulk load -url /tmp/dsbulkblog/sportsteams.csv -k dsbulkblog -t categories_list

We can load this exact same file into the dsbulkblog.categories_set table, as well:

$ dsbulk load -url /tmp/dsbulkblog/sportsteams.csv -k dsbulkblog -t categories_set

One thing to note is that the JSON for the collection must be quoted, and the individual elements in the JSON array must also be quoted. However, the string column category does not need to be in quotes. Here is the full sportsteams.csv file:

category,examples
"Baseball Teams","[\"Boston Red Sox\",\"Atlanta Braves\",\"San Diego Padres\",\"Washington Senators\"]"

"Football Teams","[\"New England Patriots\",\"Seattle Seahawks\",\"Detroit Lions\",\"Chicago Bears\"]"
"Soccer Teams","[\"New England Revolution\",\"New York Red Bulls\",\"D. C. United\",\"Chicago Fire Soccer Club\"]"
"B
asketball Teams","[\"Boston Celtics\",\"Los Angeles Lakers\",\"Chicago Bulls\",\"Detroit Pistons\"]"
"Hockey Teams","[\"Boston Bruins\",\"Philadelphia Flyers\",\"Washington Capitals\",\"Edmonton Oilers\"]"

One note on collections. While the JSON for the collection must be quoted, you must not have any spaces between the delimiter and the quoted JSON string, or leverage the --connector.csv.ignoreLeadingWhitespaces parameter, which we discussed in Example 3.7.

Example 11: TTLs and Timestamps

Example 11.1: TTLs

We can set the TTL for the load by specifying the --schema.queryTtl parameter:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id --schema.queryTtl 3600

Example 11.2: Timestamps

You can set the Timestamp similarly with --schema.queryTimestamp:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id --schema.queryTimestamp "2018-01-01T12:34:56+00:00"

Example 11.3: TTL from input data

Sometimes we wish to set the TTL or writetime for the data based on the input data itself. dsbulk allows for this in two ways. First, we can do this with the special __ttl column in the --schema.mapping parameter, such as:

$ cat /tmp/dsbulkblog/iris_no_header.csv | awk -F, '{printf("%s,%d\n", $0, $6+10000);}' | dsbulk load -k dsbulkblog -t iris_with_id -m "sepal_length,sepal_width,petal_length,petal_width,species,id,__ttl" -header false

Here we are using awk to add an extra column which will be the TTL value (which we set to the value of the id column plus 10000). We can check that the TTL is set via cqlsh:

$ cqlsh -e "SELECT id, species, Ttl(species) FROM dsbulkblog.iris_with_id LIMIT 5"

id | species         | ttl(species)

-----+-----------------+--------------  

23 |     Iris-setosa |         9986
114|  Iris-virginica |        10078 

53 | Iris-versicolor |        10016
110|  Iris-virginica |        10074 
91 | Iris-versicolor |        10054

(5 rows)

Example 11.4: TTL from input data using a custom query

We can also do this with a custom query:

$ cat /tmp/dsbulkblog/iris_no_header.csv | awk -F, 'BEGIN{printf("sepal_length,sepal_width,petal_length,petal_width,species,id,ttl_to_use\n");} {printf("%s,%d\n", $0, $6+10000);}' | dsbulk load -query "INSERT INTO dsbulkblog.iris_with_id(sepal_length,sepal_width,petal_length,petal_width,species,id) VALUES (:sepal_length,:sepal_width,:petal_length,:petal_width,:species,:id) USING TTL :ttl_to_use"

Here we need to provide a header as the USING TTL clause needs to bind to a named variable.

A similar operation could be done to set the timestamp of the write, as well.

Example 12: Dry Run

Sometimes you want to validate your data and parameters prior to actually loading. To provide for this, dsbulk has a -dryRun option, which is short for --engine.dryRun. The default is false. To do a dry run:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id -dryRun true

The output will be something like this:

Operation directory: /tmp/logs/LOAD_20190314-165538-413122
Dry-run mode enabled.

total | failed | rows/s | mb/s | kb/row | p50ms | p99ms | p999ms | batches  

150 |      0 | 0 | 0.00 |   0.00 | 0.00 | 0.00 |   0.00 | 1.00

Operation LOAD_20190314-165538-413122 completed successfully in 0 seconds.
Last processed positions can be found in positions.txt

We can then check the log directory (here it is /tmp/logs/LOAD_20190314-165538-413122) and check for any mapping errors or other validation errors.  This run was successful, so we only see an operation.log and a positions.txt file.

Example 13: Controlling the rate and in-flight requests

We can control the number of writes per second and the number of writes “in flight” at any given time via the Executor options.

Example 13.1: Controlling the rate

We can limit the number of writes per second with the --executor.maxPerSecond parameter:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id --executor.maxPerSecond 10

Example 13.2: Controlling the number of in-flight requests

Similarly, we can limit the number of outstanding writes at any given time with the --executor.maxInFlight parameter:

$ dsbulk load -url /tmp/dsbulkblog/iris.csv -k dsbulkblog -t iris_with_id --executor.maxInFlight 3


To download the DataStax Bulk Loader click here.

To delve into some of the common options to load, unloading, and counting, read the next Bulk Loader blog here.

For an into to basic loading examples read the previous Bulk Loader blog here.

dsbulk DataStax Enterprise

One-Stop Data API for Production GenAI

Astra DB gives developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.